001    /*
002     * Cumulus4j - Securing your data in the cloud - http://cumulus4j.org
003     * Copyright (C) 2011 NightLabs Consulting GmbH
004     *
005     * This program is free software: you can redistribute it and/or modify
006     * it under the terms of the GNU Affero General Public License as
007     * published by the Free Software Foundation, either version 3 of the
008     * License, or (at your option) any later version.
009     *
010     * This program is distributed in the hope that it will be useful,
011     * but WITHOUT ANY WARRANTY; without even the implied warranty of
012     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
013     * GNU Affero General Public License for more details.
014     *
015     * You should have received a copy of the GNU Affero General Public License
016     * along with this program.  If not, see <http://www.gnu.org/licenses/>.
017     */
018    package org.cumulus4j.store.crypto.keymanager.messagebroker.pmf;
019    
020    import java.io.IOException;
021    import java.io.InputStream;
022    import java.lang.ref.WeakReference;
023    import java.util.Collection;
024    import java.util.Date;
025    import java.util.HashMap;
026    import java.util.LinkedList;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Properties;
030    import java.util.Random;
031    import java.util.Timer;
032    import java.util.TimerTask;
033    import java.util.concurrent.TimeoutException;
034    
035    import javax.jdo.FetchPlan;
036    import javax.jdo.JDOHelper;
037    import javax.jdo.PersistenceManager;
038    import javax.jdo.PersistenceManagerFactory;
039    
040    import org.cumulus4j.keymanager.back.shared.GetKeyRequest;
041    import org.cumulus4j.keymanager.back.shared.IdentifierUtil;
042    import org.cumulus4j.keymanager.back.shared.Message;
043    import org.cumulus4j.keymanager.back.shared.Request;
044    import org.cumulus4j.keymanager.back.shared.Response;
045    import org.cumulus4j.keymanager.back.shared.SystemPropertyUtil;
046    import org.cumulus4j.store.crypto.AbstractCryptoManager;
047    import org.cumulus4j.store.crypto.keymanager.messagebroker.AbstractMessageBroker;
048    import org.cumulus4j.store.crypto.keymanager.messagebroker.MessageBroker;
049    import org.cumulus4j.store.crypto.keymanager.messagebroker.MessageBrokerRegistry;
050    import org.cumulus4j.store.crypto.keymanager.rest.ErrorResponseException;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    /**
055     * <p>
056     * {@link PersistenceManagerFactory}-backed implementation of {@link MessageBroker}.
057     * </p>
058     * <p>
059     * All {@link Message messages} are transferred via a shared database. Which database to be used can be
060     * configured by {@link #SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX system properties}.
061     * </p>
062     *
063     * @author Marco หงุ่ยตระกูล-Schulze - marco at nightlabs dot de
064     */
065    public class MessageBrokerPMF extends AbstractMessageBroker
066    {
067            private static final Logger logger = LoggerFactory.getLogger(MessageBrokerPMF.class);
068    
069            /**
070             * Prefix for system properties used to configure the {@link PersistenceManagerFactory}.
071             * <p>
072             * Every system property that begins with {@value #SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX}
073             * is passed (after truncating this prefix, of course) to the {@link JDOHelper#getPersistenceManagerFactory(Map)}.
074             * </p>
075             * <p>
076             * For example, to set the property "javax.jdo.option.ConnectionURL", you have to define the system
077             * property "cumulus4j.MessageBrokerPMF.persistenceProperties.javax.jdo.option.ConnectionURL".
078             * </p>
079             * <p>
080             * A set of defaults is loaded from a resource file, hence you do not need to configure everything, but
081             * without setting some basic coordinates (e.g. the JDBC URL), it is unlikely that your database server can be
082             * contacted. Of course, you could add an appropriate host record to your "/etc/hosts"
083             * and create a database with the name from our defaults on this host, but very likely you want to override these default
084             * coordinates:
085             * </p>
086             * <ul>
087             * <li>javax.jdo.option.ConnectionDriverName=com.mysql.jdbc.Driver</li>
088             * <li>javax.jdo.option.ConnectionURL=jdbc:mysql://cumulus4j-db/cumulus4jmessagebroker</li>
089             * </ul>
090             * <p>
091             * These defaults might be changed with a future version.
092             * </p>
093             */
094            public static final String SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX = "cumulus4j.MessageBrokerPMF.persistenceProperties.";
095    
096            /**
097             * <p>
098             * System property to control when the timer for cleaning up old {@link PendingRequest}s is called. The
099             * value configured here is a period, i.e. the timer will be triggered every X ms (roughly).
100             * </p><p>
101             * If this system property is not present (or not a valid number), the default is 3600000 (1 hour), which means
102             * the timer will wake up once every hour and call {@link #removeExpiredPendingRequests(boolean)} with <code>force = true</code>.
103             * </p><p>
104             * All <code>PendingRequest</code>s with a {@link PendingRequest#getLastStatusChangeTimestamp() lastStatusChangeTimestamp}
105             * being older than the {@link AbstractMessageBroker#getQueryTimeout() queryTimeout} (plus a safety margin of currently
106             * this period) are deleted.
107             * </p>
108             * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED
109             */
110            public static final String SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD = "cumulus4j.MessageBrokerPMF.cleanupTimer.period";
111    
112            /**
113             * <p>
114             * System property to control whether the timer for cleaning up old {@link PendingRequest}s should be enabled. The
115             * value configured here is a boolean value, i.e. it can be "true" or "false".
116             * </p><p>
117             * If it is disabled, the "normal" threads will do the clean-up-work periodically, when they run through
118             * {@link #_query(Class, Request)} or {@link #_pollRequest(String)}.
119             * </p>
120             * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD
121             */
122            public static final String SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED = "cumulus4j.MessageBrokerPMF.cleanupTimer.enabled";
123    
124            private long cleanupTimerPeriod = Long.MIN_VALUE;
125    
126            private Boolean cleanupTimerEnabled = null;
127    
128            protected long getCleanupTimerPeriod()
129            {
130                    if (cleanupTimerPeriod < 0) {
131                            final String propName = SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD;
132                            String property = System.getProperty(propName);
133                            long timeout = -1;
134                            if (property != null && !property.isEmpty()) {
135                                    try {
136                                            timeout = Long.parseLong(property);
137                                    } catch (NumberFormatException x) {
138                                            logger.warn("Value \"{}\" of system property '{}' is not valid, because it cannot be parsed as number!", property, propName);
139                                    }
140                                    if (timeout <= 0)
141                                            logger.warn("Value \"{}\" of system property '{}' is not valid, because it is less than or equal to 0!", property, propName);
142                                    else {
143                                            logger.info("System property '{}' is specified with value {}.", propName, timeout);
144                                            cleanupTimerPeriod = timeout;
145                                    }
146                            }
147    
148                            if (cleanupTimerPeriod < 0) {
149                                    timeout = 60L * 60L * 1000L;
150                                    cleanupTimerPeriod = timeout;
151                                    logger.info("System property '{}' is not specified; using default value {}.", propName, timeout);
152                            }
153                    }
154    
155                    return cleanupTimerPeriod;
156            }
157    
158            /**
159             * <p>
160             * Get the enabled status of the timer used to cleanup.
161             * </p>
162             * <p>
163             * This value can be configured using the system property {@value #SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED}.
164             * </p>
165             *
166             * @return the enabled status.
167             * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_PERIOD
168             * @see #SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED
169             */
170            protected boolean getCleanupTimerEnabled()
171            {
172                    Boolean val = cleanupTimerEnabled;
173                    if (val == null) {
174                            String propName = SYSTEM_PROPERTY_CLEANUP_TIMER_ENABLED;
175                            String propVal = System.getProperty(propName);
176                            propVal = propVal == null ? null : propVal.trim();
177                            if (propVal != null && !propVal.isEmpty()) {
178                                    if (propVal.equalsIgnoreCase(Boolean.TRUE.toString()))
179                                            val = Boolean.TRUE;
180                                    else if (propVal.equalsIgnoreCase(Boolean.FALSE.toString()))
181                                            val = Boolean.FALSE;
182    
183                                    if (val == null)
184                                            logger.warn("System property '{}' is set to '{}', which is an ILLEGAL value. Falling back to default value.", propName, propVal);
185                                    else
186                                            logger.info("System property '{}' is set to '{}'.", propName, val);
187                            }
188    
189                            if (val == null) {
190                                    val = Boolean.TRUE;
191                                    logger.info("System property '{}' is not set. Using default value '{}'.", propName, val);
192                            }
193    
194                            cleanupTimerEnabled = val;
195                    }
196                    return val;
197            }
198    
199            private static volatile Timer cleanupTimer = null;
200            private static volatile boolean cleanupTimerInitialised = false;
201            private volatile boolean cleanupTaskInitialised = false;
202    
203            private static class CleanupTask extends TimerTask
204            {
205                    private final Logger logger = LoggerFactory.getLogger(CleanupTask.class);
206    
207                    private final String thisID;
208                    private WeakReference<MessageBrokerPMF> messageBrokerPMFRef;
209                    private final long expiryTimerPeriodMSec;
210    
211                    public CleanupTask(MessageBrokerPMF messageBrokerPMF, long expiryTimerPeriodMSec)
212                    {
213                            if (messageBrokerPMF == null)
214                                    throw new IllegalArgumentException("messageBrokerPMF == null");
215    
216                            this.thisID = messageBrokerPMF.thisID + '.' + Long.toString(System.identityHashCode(this), 36);
217                            this.messageBrokerPMFRef = new WeakReference<MessageBrokerPMF>(messageBrokerPMF);
218                            this.expiryTimerPeriodMSec = expiryTimerPeriodMSec;
219                    }
220    
221                    @Override
222                    public void run() {
223                            try {
224                                    logger.debug("[{}] run: entered", thisID);
225                                    final MessageBrokerPMF messageBrokerPMF = messageBrokerPMFRef.get();
226                                    if (messageBrokerPMF == null) {
227                                            logger.info("[{}] run: MessageBrokerPMF was garbage-collected. Cancelling this TimerTask.", thisID);
228                                            this.cancel();
229                                            return;
230                                    }
231    
232                                    messageBrokerPMF.removeExpiredPendingRequests(true);
233    
234                                    long currentPeriodMSec = messageBrokerPMF.getCleanupTimerPeriod();
235                                    if (currentPeriodMSec != expiryTimerPeriodMSec) {
236                                            logger.info(
237                                                            "[{}] run: The expiryTimerPeriodMSec changed (oldValue={}, newValue={}). Re-scheduling this task.",
238                                                            new Object[] { thisID, expiryTimerPeriodMSec, currentPeriodMSec }
239                                            );
240                                            this.cancel();
241    
242                                            cleanupTimer.schedule(new CleanupTask(messageBrokerPMF, currentPeriodMSec), currentPeriodMSec, currentPeriodMSec);
243                                    }
244                            } catch (Throwable x) {
245                                    // The TimerThread is cancelled, if a task throws an exception. Furthermore, they are not logged at all.
246                                    // Since we do not want the TimerThread to die, we catch everything (Throwable - not only Exception) and log
247                                    // it here. IMHO there's nothing better we can do. Marco :-)
248                                    logger.error("[" + thisID + "] run: " + x, x);
249                            }
250                    }
251            };
252    
253            private final void initTimerTaskOrRemoveExpiredPendingRequestsPeriodically()
254            {
255                    if (!cleanupTimerInitialised) {
256                            synchronized (AbstractCryptoManager.class) {
257                                    if (!cleanupTimerInitialised) {
258                                            if (getCleanupTimerEnabled())
259                                                    cleanupTimer = new Timer();
260    
261                                            cleanupTimerInitialised = true;
262                                    }
263                            }
264                    }
265    
266                    if (!cleanupTaskInitialised) {
267                            synchronized (this) {
268                                    if (!cleanupTaskInitialised) {
269                                            if (cleanupTimer != null) {
270                                                    long periodMSec = getCleanupTimerPeriod();
271                                                    cleanupTimer.schedule(new CleanupTask(this, periodMSec), periodMSec, periodMSec);
272                                            }
273                                            cleanupTaskInitialised = true;
274                                    }
275                            }
276                    }
277    
278                    if (cleanupTimer == null) {
279                            logger.trace("[{}] initTimerTaskOrRemoveExpiredPendingRequestsPeriodically: No timer enabled => calling removeExpiredEntries(false) now.", thisID);
280                            removeExpiredPendingRequests(false);
281                    }
282            }
283    
284            private Date lastRemoveExpiredPendingRequestsTimestamp = null;
285    
286            private void removeExpiredPendingRequests(boolean force)
287            {
288                    synchronized (this) {
289                            if (
290                                            !force && (
291                                                            lastRemoveExpiredPendingRequestsTimestamp != null &&
292                                                            lastRemoveExpiredPendingRequestsTimestamp.after(new Date(System.currentTimeMillis() - getCleanupTimerPeriod()))
293                                            )
294                            )
295                            {
296                                    logger.trace("[{}] removeExpiredPendingRequests: force == false and period not yet elapsed. Skipping.", thisID);
297                                    return;
298                            }
299    
300                            lastRemoveExpiredPendingRequestsTimestamp = new Date();
301                    }
302    
303                    Date removePendingRequestsBeforeThisTimestamp = new Date(
304                                    System.currentTimeMillis() - getQueryTimeout()
305                                    // We use this cleanupTimerPeriod as a margin to prevent collisions with the code that still uses a PendingRequest
306                                    // and might right now (after the query-timeout) be about to delete it. Even though this time might thus
307                                    // be pretty long, it doesn't matter, if entries linger in the DB for a while as most are immediately cleaned up, anyway.
308                                    // This cleanup is only required for rare situations (e.g. when a JVM crashes). Otherwise our code should already
309                                    // ensure that objects are deleted immediately when they're not needed anymore.
310                                    // We might in the future replace the 'getCleanupTimerPeriod()' by a new system-property-controllable
311                                    // value (e.g. 'getCleanupDelay()'), though, to make it really nice & clean. But that's not important at all, IMHO.
312                                    // Marco :-)
313                                    - getCleanupTimerPeriod()
314                    );
315    
316                    try {
317    
318                            Integer deletedCount = null;
319    
320                            PersistenceManager pm = createTransactionalPersistenceManager();
321                            try {
322                                    Collection<PendingRequest> c = PendingRequest.getPendingRequestsWithLastStatusChangeTimestampOlderThanTimestamp(
323                                                    pm, removePendingRequestsBeforeThisTimestamp
324                                    );
325    
326                                    if (logger.isDebugEnabled())
327                                            deletedCount = c.size();
328    
329                                    pm.deletePersistentAll(c);
330    
331                                    pm.currentTransaction().commit();
332                            } finally {
333                                    if (pm.currentTransaction().isActive())
334                                            pm.currentTransaction().rollback();
335    
336                                    pm.close();
337                            }
338    
339                            logger.debug("[{}] removeExpiredPendingRequests: Deleted {} expired PendingRequest instances.", thisID, deletedCount);
340    
341                    } catch (Exception x) {
342                            String errMsg = "[" + thisID + "] removeExpiredPendingRequests: Deleting the expired pending requests failed. This might *occasionally* happen due to the optimistic transaction handling (=> collisions). ";
343                            if (logger.isDebugEnabled())
344                                    logger.warn(errMsg + x, x);
345                            else
346                                    logger.warn(errMsg + "Enable DEBUG logging to see the stack trace. " + x);
347                    }
348            }
349    
350            private PersistenceManagerFactory pmf;
351    
352            private Random random = new Random();
353    
354            private final String thisID = Long.toString(System.identityHashCode(this), 36);
355    
356            /**
357             * Create an instance of <code>MessageBrokerPMF</code>. You should not call this constructor directly, but
358             * instead use {@link MessageBrokerRegistry#getActiveMessageBroker()} to obtain the currently active {@link MessageBroker}.
359             */
360            public MessageBrokerPMF()
361            {
362                    logger.info("[{}] Instantiating MessageBrokerPMF.", thisID);
363                    Properties propertiesRaw = new Properties();
364                    InputStream in = MessageBrokerPMF.class.getResourceAsStream("messagebroker-datanucleus.properties");
365                    try {
366                            propertiesRaw.load(in);
367                            in.close();
368                    } catch (IOException e) {
369                            throw new RuntimeException(e);
370                    }
371    
372                    for (Map.Entry<?, ?> me : System.getProperties().entrySet()) {
373                            String key = String.valueOf(me.getKey());
374                            if (key.startsWith(SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX))
375                                    propertiesRaw.setProperty(key.substring(SYSTEM_PROPERTY_PERSISTENCE_PROPERTIES_PREFIX.length()), String.valueOf(me.getValue()));
376                    }
377    
378                    Map<String, Object> properties = new HashMap<String, Object>(propertiesRaw.size());
379                    for (Map.Entry<?, ?> me : propertiesRaw.entrySet())
380                            properties.put(String.valueOf(me.getKey()), SystemPropertyUtil.resolveSystemProperties(String.valueOf(me.getValue())));
381    
382                    Object connectionDriverNameObj = properties.get("javax.jdo.option.ConnectionDriverName");
383                    String connectionDriverName = connectionDriverNameObj == null ? null : connectionDriverNameObj.toString();
384                    logger.info("[{}] javax.jdo.option.ConnectionDriverName={}", thisID, connectionDriverName);
385                    logger.info("[{}] javax.jdo.option.ConnectionURL={}", thisID, properties.get("javax.jdo.option.ConnectionURL"));
386    
387                    // With JDBC 4, it is not necessary anymore to load the driver:
388                    // http://onjava.com/pub/a/onjava/2006/08/02/jjdbc-4-enhancements-in-java-se-6.html
389                    // And since DN might use some other class loader, anyway, it does not even make any sense at all.
390                    // Hence, I commented this out, again. Marco :-)
391    //              ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
392    //              if (contextClassLoader != null) { // I think this is never null, but better check to play safe. Marco :-)
393    //                      try {
394    //                              Class.forName(connectionDriverName, true, contextClassLoader);
395    //                              logger.info("[{}] Loaded class \"" + connectionDriverName + "\" with contextClassLoader=\"" + contextClassLoader + "\" successfully!");
396    //                      } catch (ClassNotFoundException e) {
397    //                              logger.warn("[{}] Loading class \"" + connectionDriverName + "\" with contextClassLoader=\"" + contextClassLoader + "\" failed: " + e, e);
398    //                      }
399    //              }
400    
401                    pmf = JDOHelper.getPersistenceManagerFactory(properties);
402                    // First create the structure in a separate tx (in case, the underlying DB/configuration requires this.
403                    PersistenceManager pm = pmf.getPersistenceManager();
404                    try {
405                            pm.currentTransaction().begin();
406                            pm.getExtent(PendingRequest.class);
407                            pm.currentTransaction().commit();
408                    } finally {
409                            if (pm.currentTransaction().isActive())
410                                    pm.currentTransaction().rollback();
411    
412                            pm.close();
413                    }
414    
415                    // Now test the DB access.
416                    pm = pmf.getPersistenceManager();
417                    try {
418                            pm.currentTransaction().begin();
419                            // Testing WRITE and READ access.
420                            String cryptoSessionIDPrefix = IdentifierUtil.createRandomID(50); // using a length that is not used normally to prevent collisions with absolute certainty.
421                            String cryptoSessionID = cryptoSessionIDPrefix + '.' + IdentifierUtil.createRandomID(10);
422                            GetKeyRequest dummyRequest = new GetKeyRequest(cryptoSessionID, 1, "RSA", new byte[16]);
423                            PendingRequest pendingRequest = new PendingRequest(dummyRequest);
424                            pendingRequest = pm.makePersistent(pendingRequest);
425                            pm.flush(); // Make sure, things are written NOW.
426    
427                            PendingRequest queriedPendingRequest = PendingRequest.getOldestPendingRequest(pm, cryptoSessionIDPrefix, PendingRequestStatus.waitingForProcessing);
428                            if (!pendingRequest.equals(queriedPendingRequest))
429                                    throw new IllegalStateException("Query did not find the PendingRequest instance, we just persisted for testing!");
430    
431                            // And delete the garbage immediately again.
432                            pm.deletePersistent(pendingRequest);
433    
434                            pm.currentTransaction().commit();
435                    } finally {
436                            if (pm.currentTransaction().isActive())
437                                    pm.currentTransaction().rollback();
438    
439                            pm.close();
440                    }
441                    logger.info("[{}] Successfully instantiated and tested MessageBrokerPMF.", thisID);
442            }
443    
444            protected PersistenceManager createTransactionalPersistenceManager()
445            {
446                    PersistenceManager pm = pmf.getPersistenceManager();
447                    pm.currentTransaction().begin();
448                    return pm;
449            }
450    
451            @Override
452            protected Response _query(Class<? extends Response> responseClass, Request request)
453            throws TimeoutException, ErrorResponseException
454            {
455                    String requestID = request.getRequestID();
456    
457                    logger.debug("[{}] _query[requestID={}]: Entered with request: {}", new Object[] { thisID , requestID, request });
458    
459                    initTimerTaskOrRemoveExpiredPendingRequestsPeriodically();
460    
461                    PersistenceManager pm = createTransactionalPersistenceManager();
462                    try {
463                            pm.makePersistent(new PendingRequest(request));
464                            pm.currentTransaction().commit();
465                    } finally {
466                            if (pm.currentTransaction().isActive())
467                                    pm.currentTransaction().rollback();
468    
469                            pm.close();
470                    }
471                    request = null;
472    
473                    logger.debug("[{}] _query[requestID={}]: Request persisted.", thisID, requestID);
474    
475                    // it would be nice if we could notify here, but this is not possible
476    
477    
478    //              // BEGIN trying to produce collisions.
479    //              try {
480    //                      Thread.sleep(1000L);
481    //              } catch (InterruptedException e) {
482    //                      // ignore - only log - and break loop.
483    //                      logger.warn("_query: Thread.sleep(...) was interrupted with an InterruptedException.");
484    //              }
485    //              // END trying to produce collisions.
486    
487    
488                    long beginTimestamp = System.currentTimeMillis();
489                    Response response = null;
490                    do {
491    
492                            try {
493                                    Thread.sleep(100L);
494    //                              Thread.sleep(100L + random.nextInt(900)); // TODO make configurable?!
495                            } catch (InterruptedException e) {
496                                    // ignore
497                            }
498    
499                            logger.trace("[{}] _query[requestID={}]: Beginning tx.", thisID, requestID);
500    
501                            pm = createTransactionalPersistenceManager();
502                            try {
503                                    // We now use optimistic tx, hence setSerializeRead makes no sense anymore.
504    //                              pm.currentTransaction().setSerializeRead(true);
505    
506                                    pm.getFetchPlan().setGroups(new String[] { FetchPlan.DEFAULT, PendingRequest.FetchGroup.response });
507                                    PendingRequest pendingRequest = PendingRequest.getPendingRequest(pm, requestID);
508                                    if (pendingRequest == null)
509                                            logger.warn("_query[requestID={}]: Request is not found in the list of table of PendingRequest objects anymore.", requestID);
510                                    else {
511                                            switch (pendingRequest.getStatus()) {
512                                                    case waitingForProcessing:
513                                                            // nothing to do => wait!
514                                                            break;
515                                                    case currentlyBeingProcessed:
516                                                            // nothing to do => wait!
517                                                            break;
518                                                    case completed:
519                                                            response = pendingRequest.getResponse();
520                                                            if (response == null)
521                                                                    throw new IllegalStateException("pending.response is null, even though status is 'completed'!!!");
522                                                            break;
523                                                    default:
524                                                            throw new IllegalStateException("Unknown status: " + pendingRequest.getStatus());
525                                            }
526    
527                                            if (response != null)
528                                                    pm.deletePersistent(pendingRequest);
529                                    }
530    
531                                    if (response == null && System.currentTimeMillis() - beginTimestamp > getQueryTimeout()) {
532                                            logger.warn(
533                                                            "[{}] _query[requestID={}]: Request for session {} was not answered within timeout. Current status is {}.",
534                                                            new Object[] {
535                                                                            thisID,
536                                                                            requestID,
537                                                                            (pendingRequest == null ? null : pendingRequest.getRequest().getCryptoSessionID()),
538                                                                            (pendingRequest == null ? null : pendingRequest.getStatus())
539                                                            }
540                                            );
541    
542                                            if (pendingRequest != null)
543                                                    pm.deletePersistent(pendingRequest);
544    
545                                            pm.currentTransaction().commit();
546    
547                                            throw new TimeoutException("Request was not answered within timeout! requestID=" + requestID);
548                                    }
549    
550                                    pm.currentTransaction().commit();
551                            } catch (Exception x) {
552                                    response = null;
553                                    logger.warn("[{}] _query[requestID={}]: {}", new Object[] { thisID, requestID, x.toString() });
554                            } finally {
555                                    if (pm.currentTransaction().isActive())
556                                            pm.currentTransaction().rollback();
557    
558                                    pm.close();
559                            }
560    
561                            logger.trace("[{}] _query[requestID={}]: Ended tx. response={}", new Object[] { thisID, requestID, response });
562    
563                    } while (response == null);
564    
565                    return response;
566            }
567    
568    //      private Set<String> testCollisionDetection = Collections.synchronizedSet(new HashSet<String>());
569    
570            @Override
571            protected Request _pollRequest(String cryptoSessionIDPrefix)
572            {
573                    logger.debug("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Entered.", thisID, cryptoSessionIDPrefix);
574    
575                    long beginTimestamp = System.currentTimeMillis();
576    
577                    initTimerTaskOrRemoveExpiredPendingRequestsPeriodically();
578    
579                    Request request = null;
580                    do {
581                            logger.trace("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Beginning tx.", thisID, cryptoSessionIDPrefix);
582    
583                            PersistenceManager pm = createTransactionalPersistenceManager();
584                            try {
585                            // We now use optimistic tx, hence the following makes no sense anymore.
586    //                              pm.currentTransaction().setSerializeRead(true);
587    
588                                    PendingRequest pendingRequest = PendingRequest.getOldestPendingRequest(
589                                                    pm, cryptoSessionIDPrefix, PendingRequestStatus.waitingForProcessing
590                                    );
591    
592    
593    //                              // BEGIN trying to produce collisions.
594    //                              try {
595    //                                      Thread.sleep(500L);
596    //                              } catch (InterruptedException e) {
597    //                                      // ignore - only log - and break loop.
598    //                                      logger.warn("_pollRequest[cryptoSessionIDPrefix={}]: Thread.sleep(...) was interrupted with an InterruptedException.");
599    //                              }
600    //                              // END trying to produce collisions.
601    
602    
603                                    if (pendingRequest != null) {
604                                            pendingRequest.setStatus(PendingRequestStatus.currentlyBeingProcessed);
605                                            request = pendingRequest.getRequest();
606                                    }
607    
608                                    pm.currentTransaction().commit();
609                            } catch (Exception x) {
610                                    request = null;
611                                    logger.warn("[{}] _pollRequest[cryptoSessionIDPrefix={}]: {}", new Object[] { thisID, cryptoSessionIDPrefix, x.toString() });
612                            } finally {
613                                    if (pm.currentTransaction().isActive())
614                                            pm.currentTransaction().rollback();
615    
616                                    pm.close();
617                            }
618    
619                            logger.trace("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Ended tx. request={}", new Object[] { thisID, cryptoSessionIDPrefix, request });
620    
621                            if (request == null) {
622                                    if (System.currentTimeMillis() - beginTimestamp > getPollRequestTimeout())
623                                            break;
624    
625                                    try {
626                                            Thread.sleep(50L + random.nextInt(50)); // TODO make configurable?!
627                                    } catch (InterruptedException e) {
628                                            // ignore - only log - and break loop.
629                                            logger.warn("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Thread.sleep(...) was interrupted with an InterruptedException.", thisID);
630                                            break;
631                                    }
632                            }
633                    } while (request == null);
634    
635    //              if (request != null && !testCollisionDetection.add(request.getRequestID()))
636    //                      logger.error("_pollRequest[cryptoSessionIDPrefix={}]: COLLISION!!! At least two threads process the same request! requestID={}", request.getRequestID());
637    
638                    logger.debug("[{}] _pollRequest[cryptoSessionIDPrefix={}]: Returning request: {}", new Object[] { thisID, cryptoSessionIDPrefix, request });
639    
640                    return request;
641            }
642    
643            @Override
644            protected void _pushResponse(Response response)
645            {
646                    if (response == null)
647                            throw new IllegalArgumentException("response == null");
648    
649                    if (response.getRequestID() == null)
650                            throw new IllegalArgumentException("response.requestID == null");
651    
652                    String requestID = response.getRequestID();
653    
654                    logger.debug("[{}] _pushResponse[requestID={}]: Entered.", thisID, requestID);
655    
656                    List<Throwable> errors = new LinkedList<Throwable>();
657                    boolean successful;
658                    for (int tryCounter = 0; tryCounter < 10; ++tryCounter) {
659                            successful = false;
660                            PersistenceManager pm = createTransactionalPersistenceManager();
661                            try {
662    //                              pm.currentTransaction().setSerializeRead(true); // Now using optimistic TX instead.
663    
664                                    PendingRequest pendingRequest = PendingRequest.getPendingRequest(pm, response.getRequestID());
665                                    if (pendingRequest == null || pendingRequest.getStatus() != PendingRequestStatus.currentlyBeingProcessed)
666                                            logger.warn("[{}] _pushResponse[requestID={}]: There is no request currently being processed with this requestID!!!", thisID, requestID);
667                                    else {
668                                            pendingRequest.setResponse(response);
669                                            pendingRequest.setStatus(PendingRequestStatus.completed);
670                                    }
671    
672                                    pm.currentTransaction().commit(); successful = true;
673                            } catch (Exception x) {
674                                    errors.add(x);
675                                    logger.warn("[{}] _pushResponse[requestID={}]: {}", new Object[] { thisID, requestID, x.toString() });
676                            } finally {
677                                    if (pm.currentTransaction().isActive())
678                                            pm.currentTransaction().rollback();
679    
680                                    pm.close();
681                            }
682    
683                            if (successful) {
684                                    errors.clear();
685                                    break;
686                            }
687                            else {
688                                    // In case of an error, we wait a bit before trying it again.
689                                    try {
690                                            Thread.sleep(500L);
691                                    } catch (InterruptedException e) {
692                                            // ignore - only log - and break loop.
693                                            logger.warn("[{}] _pushResponse[requestID={}]: Thread.sleep(...) was interrupted with an InterruptedException.", thisID, requestID);
694                                            break;
695                                    }
696                            }
697                    }
698    
699                    if (!errors.isEmpty()) {
700                            Throwable lastError = null;
701                            for (Throwable e : errors) {
702                                    lastError = e;
703                                    logger.warn("[" + thisID + "] _pushResponse[requestID=" + requestID + "]: " + e, e);
704                            }
705                            if (lastError instanceof RuntimeException)
706                                    throw (RuntimeException)lastError;
707                            else
708                                    throw new RuntimeException(lastError);
709                    }
710            }
711    
712    }