/**
* Copyright (c) 2004 - 2009 Eike Stepper (Berlin, Germany) and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Simon McDuff - initial API and implementation
* Eike Stepper - maintenance
*/
package org.eclipse.emf.internal.cdo.transaction;
import org.eclipse.emf.cdo.common.id.CDOID;
import org.eclipse.emf.cdo.transaction.CDOSavepoint;
import org.eclipse.emf.cdo.transaction.CDOTransaction;
import org.eclipse.emf.cdo.transaction.CDOXATransaction;
import org.eclipse.emf.cdo.util.CDOUtil;
import org.eclipse.emf.cdo.view.CDOView;
import org.eclipse.emf.cdo.view.CDOViewSet;
import org.eclipse.emf.internal.cdo.bundle.OM;
import org.eclipse.emf.internal.cdo.messages.Messages;
import org.eclipse.net4j.util.CheckUtil;
import org.eclipse.net4j.util.WrappedException;
import org.eclipse.net4j.util.om.monitor.EclipseMonitor;
import org.eclipse.net4j.util.om.monitor.OMMonitor;
import org.eclipse.net4j.util.om.monitor.EclipseMonitor.SynchronizedSubProgressMonitor;
import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.net4j.util.transaction.TransactionException;
import org.eclipse.emf.common.notify.Adapter;
import org.eclipse.emf.common.notify.Notification;
import org.eclipse.emf.common.notify.Notifier;
import org.eclipse.emf.spi.cdo.CDOSessionProtocol;
import org.eclipse.emf.spi.cdo.CDOTransactionStrategy;
import org.eclipse.emf.spi.cdo.InternalCDOTransaction;
import org.eclipse.emf.spi.cdo.InternalCDOUserSavepoint;
import org.eclipse.emf.spi.cdo.InternalCDOXASavepoint;
import org.eclipse.emf.spi.cdo.InternalCDOXATransaction;
import org.eclipse.emf.spi.cdo.CDOSessionProtocol.CommitTransactionResult;
import org.eclipse.emf.spi.cdo.InternalCDOTransaction.InternalCDOCommitContext;
import org.eclipse.emf.spi.cdo.InternalCDOXATransaction.InternalCDOXACommitContext.CDOXAState;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.NullProgressMonitor;
import org.eclipse.core.runtime.SubProgressMonitor;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Three-phase commit.
*
* Phase 1 does the following for each CDOTransaction:
* - preCommit
* - Accumulate external temporary ID.
* - request the commit to the server.
* - The server registers the commit context and returns the final ID for each temporary ID.
*
* Phase 2 does the following for each CDOTransaction:
* - Transfer to the server a list of mapping of temporary externalID and final external ID that we accumulate
* previously
* - Returns to the client only when commit process is ready to flush to disk (commit).
*
* Phase 3 does the following for each CDOTransaction:
* - Make modifications permanent.
* - PostCommit.
*
* If an exception occurred during phase 1 or phase 2, the commit will be cancelled for all {@link CDOTransaction}
* include in the XA transaction. If an exception occurred during phase 3, the commit will be cancelled only for the
* {@link CDOTransaction} where the error happened.
*
* All {@link CDOTransaction} includes in the commit process need to have finish their phase before moving to the next
* phase. For one phase, every {@link CDOTransaction} could have their own thread. It depends of the ExecutorService.
*
*
* @author Simon McDuff
* @since 2.0
*/
public class CDOXATransactionImpl implements InternalCDOXATransaction
{
private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_TRANSACTION, CDOXATransactionImpl.class);
private List transactions = new ArrayList();
private boolean allowRequestFromTransactionEnabled = true;
private ExecutorService executorService = createExecutorService();
private Map activeContext = new HashMap();
private Map> requestedCDOID = new HashMap>();
private InternalCDOXASavepoint lastSavepoint = createSavepoint(null);
private InternalCDOXASavepoint firstSavepoint = lastSavepoint;
private CDOTransactionStrategy transactionStrategy = createTransactionStrategy();
private CDOXAInternalAdapter internalAdapter = createInternalAdapter();
public CDOXATransactionImpl()
{
}
public boolean isAllowRequestFromTransactionEnabled()
{
return allowRequestFromTransactionEnabled;
}
public void setAllowRequestFromTransactionEnabled(boolean allRequest)
{
allowRequestFromTransactionEnabled = allRequest;
}
public void add(InternalCDOTransaction transaction)
{
transaction.setTransactionStrategy(transactionStrategy);
}
public void remove(InternalCDOTransaction transaction)
{
transaction.setTransactionStrategy(null);
}
public synchronized void add(CDOViewSet viewSet)
{
CDOXATransaction transSet = CDOUtil.getXATransaction(viewSet);
if (transSet != null)
{
throw new IllegalArgumentException(Messages.getString("CDOXATransactionImpl.0")); //$NON-NLS-1$
}
viewSet.eAdapters().add(internalAdapter);
for (InternalCDOTransaction transaction : getTransactions(viewSet))
{
add(transaction);
}
}
public synchronized void remove(CDOViewSet viewSet)
{
CDOXATransaction transSet = CDOUtil.getXATransaction(viewSet);
if (transSet != this)
{
throw new IllegalArgumentException(Messages.getString("CDOXATransactionImpl.1")); //$NON-NLS-1$
}
for (InternalCDOTransaction transaction : getTransactions(viewSet))
{
remove(transaction);
}
viewSet.eAdapters().remove(internalAdapter);
};
public void add(InternalCDOTransaction transaction, CDOID object)
{
synchronized (requestedCDOID)
{
Set ids = requestedCDOID.get(transaction);
if (ids == null)
{
ids = new HashSet();
requestedCDOID.put(transaction, ids);
}
ids.add(object);
}
}
public CDOID[] get(InternalCDOTransaction transaction)
{
Set ids = requestedCDOID.get(transaction);
return ids.toArray(new CDOID[ids.size()]);
}
public InternalCDOXACommitContext getCommitContext(CDOTransaction transaction)
{
return activeContext.get(transaction);
}
private void send(Collection xaContexts, final IProgressMonitor progressMonitor)
throws InterruptedException, ExecutionException
{
int xaContextSize = xaContexts.size();
progressMonitor.beginTask("", xaContextSize); //$NON-NLS-1$
try
{
Map, InternalCDOXACommitContext> futures = new HashMap, InternalCDOXACommitContext>();
for (InternalCDOXACommitContext xaContext : xaContexts)
{
xaContext.setProgressMonitor(new SynchronizedSubProgressMonitor(progressMonitor, 1));
Future