001 /* 002 * Cumulus4j - Securing your data in the cloud - http://cumulus4j.org 003 * Copyright (C) 2011 NightLabs Consulting GmbH 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 package org.cumulus4j.store.crypto.keymanager.messagebroker.pmf; 019 020 import java.io.IOException; 021 import java.io.InputStream; 022 import java.lang.ref.WeakReference; 023 import java.util.Collection; 024 import java.util.Date; 025 import java.util.HashMap; 026 import java.util.LinkedList; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Properties; 030 import java.util.Random; 031 import java.util.Timer; 032 import java.util.TimerTask; 033 import java.util.concurrent.TimeoutException; 034 035 import javax.jdo.FetchPlan; 036 import javax.jdo.JDOHelper; 037 import javax.jdo.PersistenceManager; 038 import javax.jdo.PersistenceManagerFactory; 039 040 import org.cumulus4j.keymanager.back.shared.GetKeyRequest; 041 import org.cumulus4j.keymanager.back.shared.IdentifierUtil; 042 import org.cumulus4j.keymanager.back.shared.Message; 043 import org.cumulus4j.keymanager.back.shared.Request; 044 import org.cumulus4j.keymanager.back.shared.Response; 045 import org.cumulus4j.keymanager.back.shared.SystemPropertyUtil; 046 import org.cumulus4j.store.crypto.AbstractCryptoManager; 047 import org.cumulus4j.store.crypto.keymanager.messagebroker.AbstractMessageBroker; 048 import org.cumulus4j.store.crypto.keymanager.messagebroker.MessageBroker; 049 import org.cumulus4j.store.crypto.keymanager.messagebroker.MessageBrokerRegistry; 050 import org.cumulus4j.store.crypto.keymanager.rest.ErrorResponseException; 051 import org.slf4j.Logger; 052 import org.slf4j.LoggerFactory; 053 054 /** 055 * <p> 056 * {@link PersistenceManagerFactory}-backed implementation of {@link MessageBroker}. 057 * </p> 058 * <p> 059 * All {@link Message messages} are transferred via a shared database. Which database to be used can be 060 * configured by {@link #SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX system properties}. 061 * </p> 062 * 063 * @author Marco หงุ่ยตระกูล-Schulze - marco at nightlabs dot de 064 */ 065 public class MessageBrokerPMF extends AbstractMessageBroker 066 { 067 private static final Logger logger = LoggerFactory.getLogger(MessageBrokerPMF.class); 068 069 /** 070 * Prefix for system properties used to configure the {@link PersistenceManagerFactory}. 071 * <p> 072 * Every system property that begins with {@value #SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX} 073 * is passed (after truncating this prefix, of course) to the {@link JDOHelper#getPersistenceManagerFactory(Map)}. 074 * </p> 075 * <p> 076 * For example, to set the property "javax.jdo.option.ConnectionURL", you have to define the system 077 * property "cumulus4j.MessageBrokerPMF.persistenceProperties.javax.jdo.option.ConnectionURL". 078 * </p> 079 * <p> 080 * A set of defaults is loaded from a resource file, hence you do not need to configure everything, but 081 * without setting some basic coordinates (e.g. the JDBC URL), it is unlikely that your database server can be 082 * contacted. Of course, you could add an appropriate host record to your "/etc/hosts" 083 * and create a database with the name from our defaults on this host, but very likely you want to override these default 084 * coordinates: 085 * </p> 086 * <ul> 087 * <li>javax.jdo.option.ConnectionDriverName=com.mysql.jdbc.Driver</li> 088 * <li>javax.jdo.option.ConnectionURL=jdbc:mysql://cumulus4j-db/cumulus4jmessagebroker</li> 089 * </ul> 090 * <p> 091 * These defaults might be changed with a future version. 092 * </p> 093 */ 094 public static final String SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX = "cumulus4j.MessageBrokerPMF.persistenceProperties."; 095 096 /** 097 * <p> 098 * System property to control when the timer for cleaning up old {@link PendingRequest}s is called. The 099 * value configured here is a period, i.e. the timer will be triggered every X ms (roughly). 100 * </p><p> 101 * If this system property is not present (or not a valid number), the default is 3600000 (1 hour), which means 102 * the timer will wake up once every hour and call {@link #removeExpiredPendingRequests(boolean)} with <code>force = true</code>. 103 * </p><p> 104 * All <code>PendingRequest</code>s with a {@link PendingRequest#getLastStatusChangeTimestamp() lastStatusChangeTimestamp} 105 * being older than the {@link AbstractMessageBroker#getQueryTimeout() queryTimeout} (plus a safety margin of currently 106 * this period) are deleted. 107 * </p> 108 * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED 109 */ 110 public static final String SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD = "cumulus4j.MessageBrokerPMF.cleanupTimer.period"; 111 112 /** 113 * <p> 114 * System property to control whether the timer for cleaning up old {@link PendingRequest}s should be enabled. The 115 * value configured here is a boolean value, i.e. it can be "true" or "false". 116 * </p><p> 117 * If it is disabled, the "normal" threads will do the clean-up-work periodically, when they run through 118 * {@link #_query(Class, Request)} or {@link #_pollRequest(String)}. 119 * </p> 120 * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD 121 */ 122 public static final String SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED = "cumulus4j.MessageBrokerPMF.cleanupTimer.enabled"; 123 124 private long cleanupTimerPeriod = Long.MIN_VALUE; 125 126 private Boolean cleanupTimerEnabled = null; 127 128 protected long getCleanupTimerPeriod() 129 { 130 if (cleanupTimerPeriod < 0) { 131 final String propName = SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD; 132 String property = System.getProperty(propName); 133 long timeout = -1; 134 if (property != null && !property.isEmpty()) { 135 try { 136 timeout = Long.parseLong(property); 137 } catch (NumberFormatException x) { 138 logger.warn("Value \"{}\" of system property '{}' is not valid, because it cannot be parsed as number!", property, propName); 139 } 140 if (timeout <= 0) 141 logger.warn("Value \"{}\" of system property '{}' is not valid, because it is less than or equal to 0!", property, propName); 142 else { 143 logger.info("System property '{}' is specified with value {}.", propName, timeout); 144 cleanupTimerPeriod = timeout; 145 } 146 } 147 148 if (cleanupTimerPeriod < 0) { 149 timeout = 60L * 60L * 1000L; 150 cleanupTimerPeriod = timeout; 151 logger.info("System property '{}' is not specified; using default value {}.", propName, timeout); 152 } 153 } 154 155 return cleanupTimerPeriod; 156 } 157 158 /** 159 * <p> 160 * Get the enabled status of the timer used to cleanup. 161 * </p> 162 * <p> 163 * This value can be configured using the system property {@value #SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED}. 164 * </p> 165 * 166 * @return the enabled status. 167 * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD 168 * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED 169 */ 170 protected boolean getCleanupTimerEnabled() 171 { 172 Boolean val = cleanupTimerEnabled; 173 if (val == null) { 174 String propName = SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED; 175 String propVal = System.getProperty(propName); 176 propVal = propVal == null ? null : propVal.trim(); 177 if (propVal != null && !propVal.isEmpty()) { 178 if (propVal.equalsIgnoreCase(Boolean.TRUE.toString())) 179 val = Boolean.TRUE; 180 else if (propVal.equalsIgnoreCase(Boolean.FALSE.toString())) 181 val = Boolean.FALSE; 182 183 if (val == null) 184 logger.warn("System property '{}' is set to '{}', which is an ILLEGAL value. Falling back to default value.", propName, propVal); 185 else 186 logger.info("System property '{}' is set to '{}'.", propName, val); 187 } 188 189 if (val == null) { 190 val = Boolean.TRUE; 191 logger.info("System property '{}' is not set. Using default value '{}'.", propName, val); 192 } 193 194 cleanupTimerEnabled = val; 195 } 196 return val; 197 } 198 199 private static volatile Timer cleanupTimer = null; 200 private static volatile boolean cleanupTimerInitialised = false; 201 private volatile boolean cleanupTaskInitialised = false; 202 203 private static class CleanupTask extends TimerTask 204 { 205 private final Logger logger = LoggerFactory.getLogger(CleanupTask.class); 206 207 private final String thisID; 208 private WeakReference<MessageBrokerPMF> messageBrokerPMFRef; 209 private final long expiryTimerPeriodMSec; 210 211 public CleanupTask(MessageBrokerPMF messageBrokerPMF, long expiryTimerPeriodMSec) 212 { 213 if (messageBrokerPMF == null) 214 throw new IllegalArgumentException("messageBrokerPMF == null"); 215 216 this.thisID = messageBrokerPMF.thisID + '.' + Long.toString(System.identityHashCode(this), 36); 217 this.messageBrokerPMFRef = new WeakReference<MessageBrokerPMF>(messageBrokerPMF); 218 this.expiryTimerPeriodMSec = expiryTimerPeriodMSec; 219 } 220 221 @Override 222 public void run() { 223 try { 224 logger.debug("[{}] run: entered", thisID); 225 final MessageBrokerPMF messageBrokerPMF = messageBrokerPMFRef.get(); 226 if (messageBrokerPMF == null) { 227 logger.info("[{}] run: MessageBrokerPMF was garbage-collected. Cancelling this TimerTask.", thisID); 228 this.cancel(); 229 return; 230 } 231 232 messageBrokerPMF.removeExpiredPendingRequests(true); 233 234 long currentPeriodMSec = messageBrokerPMF.getCleanupTimerPeriod(); 235 if (currentPeriodMSec != expiryTimerPeriodMSec) { 236 logger.info( 237 "[{}] run: The expiryTimerPeriodMSec changed (oldValue={}, newValue={}). Re-scheduling this task.", 238 new Object[] { thisID, expiryTimerPeriodMSec, currentPeriodMSec } 239 ); 240 this.cancel(); 241 242 cleanupTimer.schedule(new CleanupTask(messageBrokerPMF, currentPeriodMSec), currentPeriodMSec, currentPeriodMSec); 243 } 244 } catch (Throwable x) { 245 // The TimerThread is cancelled, if a task throws an exception. Furthermore, they are not logged at all. 246 // Since we do not want the TimerThread to die, we catch everything (Throwable - not only Exception) and log 247 // it here. IMHO there's nothing better we can do. Marco :-) 248 logger.error("[" + thisID + "] run: " + x, x); 249 } 250 } 251 }; 252 253 private final void initTimerTaskOrRemoveExpiredPendingRequestsPeriodically() 254 { 255 if (!cleanupTimerInitialised) { 256 synchronized (AbstractCryptoManager.class) { 257 if (!cleanupTimerInitialised) { 258 if (getCleanupTimerEnabled()) 259 cleanupTimer = new Timer(MessageBrokerPMF.class.getSimpleName(), true); 260 261 cleanupTimerInitialised = true; 262 } 263 } 264 } 265 266 if (!cleanupTaskInitialised) { 267 synchronized (this) { 268 if (!cleanupTaskInitialised) { 269 if (cleanupTimer != null) { 270 long periodMSec = getCleanupTimerPeriod(); 271 cleanupTimer.schedule(new CleanupTask(this, periodMSec), periodMSec, periodMSec); 272 } 273 cleanupTaskInitialised = true; 274 } 275 } 276 } 277 278 if (cleanupTimer == null) { 279 logger.trace("[{}] initTimerTaskOrRemoveExpiredPendingRequestsPeriodically: No timer enabled => calling removeExpiredEntries(false) now.", thisID); 280 removeExpiredPendingRequests(false); 281 } 282 } 283 284 private Date lastRemoveExpiredPendingRequestsTimestamp = null; 285 286 private void removeExpiredPendingRequests(boolean force) 287 { 288 synchronized (this) { 289 if ( 290 !force && ( 291 lastRemoveExpiredPendingRequestsTimestamp != null && 292 lastRemoveExpiredPendingRequestsTimestamp.after(new Date(System.currentTimeMillis() - getCleanupTimerPeriod())) 293 ) 294 ) 295 { 296 logger.trace("[{}] removeExpiredPendingRequests: force == false and period not yet elapsed. Skipping.", thisID); 297 return; 298 } 299 300 lastRemoveExpiredPendingRequestsTimestamp = new Date(); 301 } 302 303 Date removePendingRequestsBeforeThisTimestamp = new Date( 304 System.currentTimeMillis() - getQueryTimeout() 305 // We use this cleanupTimerPeriod as a margin to prevent collisions with the code that still uses a PendingRequest 306 // and might right now (after the query-timeout) be about to delete it. Even though this time might thus 307 // be pretty long, it doesn't matter, if entries linger in the DB for a while as most are immediately cleaned up, anyway. 308 // This cleanup is only required for rare situations (e.g. when a JVM crashes). Otherwise our code should already 309 // ensure that objects are deleted immediately when they're not needed anymore. 310 // We might in the future replace the 'getCleanupTimerPeriod()' by a new system-property-controllable 311 // value (e.g. 'getCleanupDelay()'), though, to make it really nice & clean. But that's not important at all, IMHO. 312 // Marco :-) 313 - getCleanupTimerPeriod() 314 ); 315 316 try { 317 318 Integer deletedCount = null; 319 320 PersistenceManager pm = createTransactionalPersistenceManager(); 321 try { 322 Collection<PendingRequest> c = PendingRequest.getPendingRequestsWithLastStatusChangeTimestampOlderThanTimestamp( 323 pm, removePendingRequestsBeforeThisTimestamp 324 ); 325 326 if (logger.isDebugEnabled()) 327 deletedCount = c.size(); 328 329 pm.deletePersistentAll(c); 330 331 pm.currentTransaction().commit(); 332 } finally { 333 if (pm.currentTransaction().isActive()) 334 pm.currentTransaction().rollback(); 335 336 pm.close(); 337 } 338 339 logger.debug("[{}] removeExpiredPendingRequests: Deleted {} expired PendingRequest instances.", thisID, deletedCount); 340 341 } catch (Exception x) { 342 String errMsg = "[" + thisID + "] removeExpiredPendingRequests: Deleting the expired pending requests failed. This might *occasionally* happen due to the optimistic transaction handling (=> collisions). "; 343 if (logger.isDebugEnabled()) 344 logger.warn(errMsg + x, x); 345 else 346 logger.warn(errMsg + "Enable DEBUG logging to see the stack trace. " + x); 347 } 348 } 349 350 private PersistenceManagerFactory pmf; 351 352 private Random random = new Random(); 353 354 private final String thisID = Long.toString(System.identityHashCode(this), 36); 355 356 /** 357 * Create an instance of <code>MessageBrokerPMF</code>. You should not call this constructor directly, but 358 * instead use {@link MessageBrokerRegistry#getActiveMessageBroker()} to obtain the currently active {@link MessageBroker}. 359 */ 360 public MessageBrokerPMF() 361 { 362 logger.info("[{}] Instantiating MessageBrokerPMF.", thisID); 363 Properties propertiesRaw = new Properties(); 364 InputStream in = MessageBrokerPMF.class.getResourceAsStream("messagebroker-datanucleus.properties"); 365 try { 366 propertiesRaw.load(in); 367 in.close(); 368 } catch (IOException e) { 369 throw new RuntimeException(e); 370 } 371 372 for (Map.Entry<?, ?> me : System.getProperties().entrySet()) { 373 String key = String.valueOf(me.getKey()); 374 if (key.startsWith(SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX)) 375 propertiesRaw.setProperty(key.substring(SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX.length()), String.valueOf(me.getValue())); 376 } 377 378 Map<String, Object> properties = new HashMap<String, Object>(propertiesRaw.size()); 379 for (Map.Entry<?, ?> me : propertiesRaw.entrySet()) 380 properties.put(String.valueOf(me.getKey()), SystemPropertyUtil.resolveSystemProperties(String.valueOf(me.getValue()))); 381 382 Object connectionDriverNameObj = properties.get("javax.jdo.option.ConnectionDriverName"); 383 String connectionDriverName = connectionDriverNameObj == null ? null : connectionDriverNameObj.toString(); 384 logger.info("[{}] javax.jdo.option.ConnectionDriverName={}", thisID, connectionDriverName); 385 logger.info("[{}] javax.jdo.option.ConnectionURL={}", thisID, properties.get("javax.jdo.option.ConnectionURL")); 386 387 // With JDBC 4, it is not necessary anymore to load the driver: 388 // http://onjava.com/pub/a/onjava/2006/08/02/jjdbc-4-enhancements-in-java-se-6.html 389 // And since DN might use some other class loader, anyway, it does not even make any sense at all. 390 // Hence, I commented this out, again. Marco :-) 391 // ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); 392 // if (contextClassLoader != null) { // I think this is never null, but better check to play safe. Marco :-) 393 // try { 394 // Class.forName(connectionDriverName, true, contextClassLoader); 395 // logger.info("[{}] Loaded class \"" + connectionDriverName + "\" with contextClassLoader=\"" + contextClassLoader + "\" successfully!"); 396 // } catch (ClassNotFoundException e) { 397 // logger.warn("[{}] Loading class \"" + connectionDriverName + "\" with contextClassLoader=\"" + contextClassLoader + "\" failed: " + e, e); 398 // } 399 // } 400 401 pmf = JDOHelper.getPersistenceManagerFactory(properties); 402 // First create the structure in a separate tx (in case, the underlying DB/configuration requires this. 403 PersistenceManager pm = pmf.getPersistenceManager(); 404 try { 405 pm.currentTransaction().begin(); 406 pm.getExtent(PendingRequest.class); 407 pm.currentTransaction().commit(); 408 } finally { 409 if (pm.currentTransaction().isActive()) 410 pm.currentTransaction().rollback(); 411 412 pm.close(); 413 } 414 415 // Now test the DB access. 416 pm = pmf.getPersistenceManager(); 417 try { 418 pm.currentTransaction().begin(); 419 // Testing WRITE and READ access. 420 String cryptoSessionIDPrefix = IdentifierUtil.createRandomID(50); // using a length that is not used normally to prevent collisions with absolute certainty. 421 String cryptoSessionID = cryptoSessionIDPrefix + '*' + IdentifierUtil.createRandomID(10); 422 GetKeyRequest dummyRequest = new GetKeyRequest(cryptoSessionID, 1, "RSA", new byte[16]); 423 PendingRequest pendingRequest = new PendingRequest(dummyRequest); 424 pendingRequest = pm.makePersistent(pendingRequest); 425 pm.flush(); // Make sure, things are written NOW. 426 427 PendingRequest queriedPendingRequest = PendingRequest.getOldestPendingRequest(pm, cryptoSessionIDPrefix, PendingRequestStatus.waitingForProcessing); 428 if (!pendingRequest.equals(queriedPendingRequest)) 429 throw new IllegalStateException("Query did not find the PendingRequest instance, we just persisted for testing!"); 430 431 // And delete the garbage immediately again. 432 pm.deletePersistent(pendingRequest); 433 434 pm.currentTransaction().commit(); 435 } finally { 436 if (pm.currentTransaction().isActive()) 437 pm.currentTransaction().rollback(); 438 439 pm.close(); 440 } 441 logger.info("[{}] Successfully instantiated and tested MessageBrokerPMF.", thisID); 442 } 443 444 protected PersistenceManager createTransactionalPersistenceManager() 445 { 446 PersistenceManager pm = pmf.getPersistenceManager(); 447 pm.currentTransaction().begin(); 448 return pm; 449 } 450 451 @Override 452 protected Response _query(Class<? extends Response> responseClass, Request request) 453 throws TimeoutException, ErrorResponseException 454 { 455 String requestID = request.getRequestID(); 456 457 logger.debug("[{}] _query[requestID={}]: Entered with request: {}", new Object[] { thisID , requestID, request }); 458 459 initTimerTaskOrRemoveExpiredPendingRequestsPeriodically(); 460 461 PersistenceManager pm = createTransactionalPersistenceManager(); 462 try { 463 pm.makePersistent(new PendingRequest(request)); 464 pm.currentTransaction().commit(); 465 } finally { 466 if (pm.currentTransaction().isActive()) 467 pm.currentTransaction().rollback(); 468 469 pm.close(); 470 } 471 request = null; 472 473 logger.debug("[{}] _query[requestID={}]: Request persisted.", thisID, requestID); 474 475 // it would be nice if we could notify here, but this is not possible 476 477 478 // // BEGIN trying to produce collisions. 479 // try { 480 // Thread.sleep(1000L); 481 // } catch (InterruptedException e) { 482 // // ignore - only log - and break loop. 483 // logger.warn("_query: Thread.sleep(...) was interrupted with an InterruptedException."); 484 // } 485 // // END trying to produce collisions. 486 487 488 long beginTimestamp = System.currentTimeMillis(); 489 Response response = null; 490 do { 491 492 try { 493 Thread.sleep(100L); 494 // Thread.sleep(100L + random.nextInt(900)); // TODO make configurable?! 495 } catch (InterruptedException e) { 496 // ignore 497 } 498 499 logger.trace("[{}] _query[requestID={}]: Beginning tx.", thisID, requestID); 500 501 pm = createTransactionalPersistenceManager(); 502 try { 503 // We now use optimistic tx, hence setSerializeRead makes no sense anymore. 504 // pm.currentTransaction().setSerializeRead(true); 505 506 pm.getFetchPlan().setGroups(new String[] { FetchPlan.DEFAULT, PendingRequest.FetchGroup.response }); 507 PendingRequest pendingRequest = PendingRequest.getPendingRequest(pm, requestID); 508 if (pendingRequest == null) 509 logger.warn("_query[requestID={}]: Request is not found in the list of table of PendingRequest objects anymore.", requestID); 510 else { 511 switch (pendingRequest.getStatus()) { 512 case waitingForProcessing: 513 // nothing to do => wait! 514 break; 515 case currentlyBeingProcessed: 516 // nothing to do => wait! 517 break; 518 case completed: 519 response = pendingRequest.getResponse(); 520 if (response == null) 521 throw new IllegalStateException("pending.response is null, even though status is 'completed'!!!"); 522 break; 523 default: 524 throw new IllegalStateException("Unknown status: " + pendingRequest.getStatus()); 525 } 526 527 if (response != null) 528 pm.deletePersistent(pendingRequest); 529 } 530 531 if (response == null && System.currentTimeMillis() - beginTimestamp > getQueryTimeout()) { 532 logger.warn( 533 "[{}] _query[requestID={}]: Request for session {} was not answered within timeout. Current status is {}.", 534 new Object[] { 535 thisID, 536 requestID, 537 (pendingRequest == null ? null : pendingRequest.getRequest().getCryptoSessionID()), 538 (pendingRequest == null ? null : pendingRequest.getStatus()) 539 } 540 ); 541 542 if (pendingRequest != null) 543 pm.deletePersistent(pendingRequest); 544 545 pm.currentTransaction().commit(); 546 547 throw new TimeoutException("Request was not answered within timeout! requestID=" + requestID); 548 } 549 550 pm.currentTransaction().commit(); 551 } catch (Exception x) { 552 response = null; 553 logger.warn("[{}] _query[requestID={}]: {}", new Object[] { thisID, requestID, x.toString() }); 554 } finally { 555 if (pm.currentTransaction().isActive()) 556 pm.currentTransaction().rollback(); 557 558 pm.close(); 559 } 560 561 logger.trace("[{}] _query[requestID={}]: Ended tx. response={}", new Object[] { thisID, requestID, response }); 562 563 } while (response == null); 564 565 return response; 566 } 567 568 // private Set<String> testCollisionDetection = Collections.synchronizedSet(new HashSet<String>()); 569 570 @Override 571 protected Request _pollRequest(String cryptoSessionIDPrefix) 572 { 573 logger.debug("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Entered.", thisID, cryptoSessionIDPrefix); 574 575 long beginTimestamp = System.currentTimeMillis(); 576 577 initTimerTaskOrRemoveExpiredPendingRequestsPeriodically(); 578 579 Request request = null; 580 do { 581 logger.trace("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Beginning tx.", thisID, cryptoSessionIDPrefix); 582 583 PersistenceManager pm = createTransactionalPersistenceManager(); 584 try { 585 // We now use optimistic tx, hence the following makes no sense anymore. 586 // pm.currentTransaction().setSerializeRead(true); 587 588 PendingRequest pendingRequest = PendingRequest.getOldestPendingRequest( 589 pm, cryptoSessionIDPrefix, PendingRequestStatus.waitingForProcessing 590 ); 591 592 593 // // BEGIN trying to produce collisions. 594 // try { 595 // Thread.sleep(500L); 596 // } catch (InterruptedException e) { 597 // // ignore - only log - and break loop. 598 // logger.warn("_pollRequest[cryptoSessionIDPrefix={}]: Thread.sleep(...) was interrupted with an InterruptedException."); 599 // } 600 // // END trying to produce collisions. 601 602 603 if (pendingRequest != null) { 604 pendingRequest.setStatus(PendingRequestStatus.currentlyBeingProcessed); 605 request = pendingRequest.getRequest(); 606 } 607 608 pm.currentTransaction().commit(); 609 } catch (Exception x) { 610 request = null; 611 logger.warn("[{}] _pollRequest[cryptoSessionIDPrefix={}]: {}", new Object[] { thisID, cryptoSessionIDPrefix, x.toString() }); 612 } finally { 613 if (pm.currentTransaction().isActive()) 614 pm.currentTransaction().rollback(); 615 616 pm.close(); 617 } 618 619 logger.trace("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Ended tx. request={}", new Object[] { thisID, cryptoSessionIDPrefix, request }); 620 621 if (request == null) { 622 if (System.currentTimeMillis() - beginTimestamp > getPollRequestTimeout()) 623 break; 624 625 try { 626 Thread.sleep(50L + random.nextInt(50)); // TODO make configurable?! 627 } catch (InterruptedException e) { 628 // ignore - only log - and break loop. 629 logger.warn("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Thread.sleep(...) was interrupted with an InterruptedException.", thisID); 630 break; 631 } 632 } 633 } while (request == null); 634 635 // if (request != null && !testCollisionDetection.add(request.getRequestID())) 636 // logger.error("_pollRequest[cryptoSessionIDPrefix={}]: COLLISION!!! At least two threads process the same request! requestID={}", request.getRequestID()); 637 638 logger.debug("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Returning request: {}", new Object[] { thisID, cryptoSessionIDPrefix, request }); 639 640 return request; 641 } 642 643 @Override 644 protected void _pushResponse(Response response) 645 { 646 if (response == null) 647 throw new IllegalArgumentException("response == null"); 648 649 if (response.getRequestID() == null) 650 throw new IllegalArgumentException("response.requestID == null"); 651 652 String requestID = response.getRequestID(); 653 654 logger.debug("[{}] _pushResponse[requestID={}]: Entered.", thisID, requestID); 655 656 List<Throwable> errors = new LinkedList<Throwable>(); 657 boolean successful; 658 for (int tryCounter = 0; tryCounter < 10; ++tryCounter) { 659 successful = false; 660 PersistenceManager pm = createTransactionalPersistenceManager(); 661 try { 662 // pm.currentTransaction().setSerializeRead(true); // Now using optimistic TX instead. 663 664 PendingRequest pendingRequest = PendingRequest.getPendingRequest(pm, response.getRequestID()); 665 if (pendingRequest == null || pendingRequest.getStatus() != PendingRequestStatus.currentlyBeingProcessed) 666 logger.warn("[{}] _pushResponse[requestID={}]: There is no request currently being processed with this requestID!!!", thisID, requestID); 667 else { 668 pendingRequest.setResponse(response); 669 pendingRequest.setStatus(PendingRequestStatus.completed); 670 } 671 672 pm.currentTransaction().commit(); successful = true; 673 } catch (Exception x) { 674 errors.add(x); 675 logger.warn("[{}] _pushResponse[requestID={}]: {}", new Object[] { thisID, requestID, x.toString() }); 676 } finally { 677 if (pm.currentTransaction().isActive()) 678 pm.currentTransaction().rollback(); 679 680 pm.close(); 681 } 682 683 if (successful) { 684 errors.clear(); 685 break; 686 } 687 else { 688 // In case of an error, we wait a bit before trying it again. 689 try { 690 Thread.sleep(500L); 691 } catch (InterruptedException e) { 692 // ignore - only log - and break loop. 693 logger.warn("[{}] _pushResponse[requestID={}]: Thread.sleep(...) was interrupted with an InterruptedException.", thisID, requestID); 694 break; 695 } 696 } 697 } 698 699 if (!errors.isEmpty()) { 700 Throwable lastError = null; 701 for (Throwable e : errors) { 702 lastError = e; 703 logger.warn("[" + thisID + "] _pushResponse[requestID=" + requestID + "]: " + e, e); 704 } 705 if (lastError instanceof RuntimeException) 706 throw (RuntimeException)lastError; 707 else 708 throw new RuntimeException(lastError); 709 } 710 } 711 712 }