diff options
Diffstat (limited to 'rms/org.eclipse.ptp.rm.smoa.core/src/org/eclipse/ptp/rm/smoa/core/rmsystem/JobThread.java')
-rw-r--r-- | rms/org.eclipse.ptp.rm.smoa.core/src/org/eclipse/ptp/rm/smoa/core/rmsystem/JobThread.java | 442 |
1 files changed, 0 insertions, 442 deletions
diff --git a/rms/org.eclipse.ptp.rm.smoa.core/src/org/eclipse/ptp/rm/smoa/core/rmsystem/JobThread.java b/rms/org.eclipse.ptp.rm.smoa.core/src/org/eclipse/ptp/rm/smoa/core/rmsystem/JobThread.java deleted file mode 100644 index 95ce6ca9b..000000000 --- a/rms/org.eclipse.ptp.rm.smoa.core/src/org/eclipse/ptp/rm/smoa/core/rmsystem/JobThread.java +++ /dev/null @@ -1,442 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2010 Poznan Supercomputing and Networking Center - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Jan Konczak (PSNC) - initial implementation - ******************************************************************************/ - -package org.eclipse.ptp.rm.smoa.core.rmsystem; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.BitSet; -import java.util.HashMap; -import java.util.Map; - -import org.eclipse.core.filesystem.IFileStore; -import org.eclipse.core.runtime.CoreException; -import org.eclipse.ptp.core.attributes.AttributeManager; -import org.eclipse.ptp.core.elements.IPJob; -import org.eclipse.ptp.core.elements.IPMachine; -import org.eclipse.ptp.core.elements.IPNode; -import org.eclipse.ptp.core.elements.IPResourceManager; -import org.eclipse.ptp.core.elements.attributes.JobAttributes; -import org.eclipse.ptp.core.elements.attributes.ProcessAttributes; -import org.eclipse.ptp.remote.core.IRemoteFileManager; -import org.eclipse.ptp.remote.core.PTPRemoteCorePlugin; -import org.eclipse.ptp.rm.smoa.core.rmsystem.PoolingIntervalsAndStatic.SMOAJobState; -import org.eclipse.ptp.rm.smoa.core.rservices.SMOAFileStore; -import org.eclipse.ptp.rm.smoa.core.util.NotifyShell; - -import com.smoa.comp.sdk.SMOAFactory; -import com.smoa.comp.sdk.exceptions.FileNotFoundException; -import com.smoa.comp.sdk.exceptions.NotAuthorizedException; -import com.smoa.comp.sdk.exceptions.UnknownActivityIdentifierException; -import com.smoa.comp.sdk.types.ActivityEndpointReference; -import com.smoa.comp.sdk.types.SMOAActivityStatus; - -/** - * Thread for monitoring a single job - checks state and transfers standard - * output. Allows job's termination. - */ -class JobThread extends Thread { - // The RM that created the job - private final SMOAResourceManager rm; - private final IPResourceManager pRM; - - // The eclipse job control object - private final IPJob jobControl; - - // The SMOA job control object - private final ActivityEndpointReference activityIdentifier; - - private final SMOAFactory factory; - private final IRemoteFileManager fileManager; - - // Remote files used by launch - private final IFileStore out; - private final IFileStore err; - private final IFileStore sh; - private final IFileStore machinefile; - - // Offsets for the output files - private int stdOutOffset = 0; - private int stdErrOffset = 0; - - /** - * Constructs the job thread for monitoring a single job - */ - public JobThread(SMOAResourceManager rm, SMOAFactory factory, ActivityEndpointReference activityEndpointReference, - IPJob jobControl, String out, String err, String sh, String machinefile) { - this.rm = rm; - this.pRM = (IPResourceManager) rm.getAdapter(IPResourceManager.class); - this.factory = factory; - this.activityIdentifier = activityEndpointReference; - this.jobControl = jobControl; - - this.setPriority(MIN_PRIORITY); - this.setName("JobListener for " + activityIdentifier.getActivityUUID()); //$NON-NLS-1$ - - final IRemoteFileManager fileManager_t = PTPRemoteCorePlugin.getDefault() - .getRemoteServices(rm.getControlConfiguration().getRemoteServicesId()) - .getFileManager(rm.getControlConfiguration().getConnection()); - - fileManager = fileManager_t; - - this.out = fileManager.getResource(out); - this.err = fileManager.getResource(err); - if (sh != null) { - this.sh = fileManager.getResource(sh); - } else { - this.sh = null; - } - if (machinefile != null) { - this.machinefile = fileManager.getResource(machinefile); - } else { - this.machinefile = null; - } - - } - - /** Adds given text to standard error, and makes it appear on console */ - void appendStdErr(String stderr) { - final AttributeManager outManager = new AttributeManager(); - outManager.addAttribute(ProcessAttributes.getStderrAttributeDefinition().create(stderr)); - final BitSet bs = new BitSet(); - bs.set(jobControl.getProcessJobRanks().nextSetBit(0)); - jobControl.addProcessAttributes(bs, outManager); - } - - /** Adds given text to standard output, and makes it appear on console */ - void appendStdOut(String stdout) { - final AttributeManager outManager = new AttributeManager(); - outManager.addAttribute(ProcessAttributes.getStdoutAttributeDefinition().create(stdout)); - final BitSet bs = new BitSet(); - bs.set(jobControl.getProcessJobRanks().nextSetBit(0)); - jobControl.addProcessAttributes(bs, outManager); - } - - /** - * Changes job state - */ - void changeState(JobAttributes.State newState) { - if (JobAttributes.State.RUNNING.equals(newState)) { - enteredRunPhase(); - } - - final AttributeManager am = new AttributeManager(); - am.addAttribute(JobAttributes.getStateAttributeDefinition().create(newState)); - rm.getMonitor().getRuntimeSystem().changeJob(jobControl.getID(), am); - } - - /** - * Called when the process state becomes RUNNING. - * - * This is the proper moment for identifying where the processes are - * located, so that debug routing files may be written. - */ - private void enteredRunPhase() { - if (machinefile != null) { - try { - final BufferedReader reader = new BufferedReader(new InputStreamReader(machinefile.openInputStream(0, null))); - - final IPMachine machine = pRM.getMachines()[0]; - - final Map<String, String> nodesByName = new HashMap<String, String>(); - for (final IPNode ipNode : machine.getNodes()) { - nodesByName.put(ipNode.getName(), ipNode.getID()); - } - - String line; - int i = 0; - while ((line = reader.readLine()) != null) { - final BitSet bs = new BitSet(); - bs.set(i++); - final AttributeManager attrs = new AttributeManager(); - - if (!nodesByName.containsKey(line)) { - rm.getMonitor().getRuntimeSystem().addUnknownNode(machine, line); - nodesByName.put(line, ((Integer) nodesByName.size()).toString()); - NotifyShell.open(Messages.JobThread_UnknownNode_title, Messages.JobThread_UnknownNode_text_1 + line - + Messages.JobThread_UnknownNode_text_2); - attrs.addAttribute(ProcessAttributes.getNodeIdAttributeDefinition().create(nodesByName.get(line))); - } else { - attrs.addAttribute(ProcessAttributes.getNodeIdAttributeDefinition().create(nodesByName.get(line))); - } - jobControl.addProcessesByJobRanks(bs, attrs); - } - return; - } catch (final CoreException e) { - NotifyShell.open(Messages.JobThread_ErrorOpeningRemote, e.toString()); - - final BitSet bs = new BitSet(1); - bs.set(0); - final AttributeManager jam = new AttributeManager(); - jam.addAttribute(ProcessAttributes.getNodeIdAttributeDefinition().create("0")); //$NON-NLS-1$ - jobControl.addProcessesByJobRanks(bs, jam); - } catch (final IOException e) { - NotifyShell.open(Messages.JobThread_ErrorReadingRemote, e.toString()); - - if (jobControl.getProcessJobRanks() != null && !jobControl.getProcessJobRanks().isEmpty()) { - return; - } - - final BitSet bs = new BitSet(1); - bs.set(0); - final AttributeManager jam = new AttributeManager(); - jam.addAttribute(ProcessAttributes.getNodeIdAttributeDefinition().create("0")); //$NON-NLS-1$ - jobControl.addProcessesByJobRanks(bs, jam); - } - } else { - final BitSet bs = new BitSet(1); - bs.set(0); - final AttributeManager jam = new AttributeManager(); - jam.addAttribute(ProcessAttributes.getNodeIdAttributeDefinition().create("0")); //$NON-NLS-1$ - jobControl.addProcessesByJobRanks(bs, jam); - } - } - - /** - * Called if an exception has been thrown while monitoring job - */ - void exceptionCaught(Exception e) { - final AttributeManager am = new AttributeManager(); - - am.addAttribute(JobAttributes.getStatusAttributeDefinition().create(Messages.JobThread_ExceptionByMonitoring)); - - am.addAttribute(JobAttributes.getStateAttributeDefinition().create(JobAttributes.State.COMPLETED)); - - am.addAttribute(PoolingIntervalsAndStatic.exceptionAttrDef.create(e.getLocalizedMessage())); - - NotifyShell.open(Messages.JobThread_ExceptionByMonitoring, e.getLocalizedMessage()); - - rm.getMonitor().getRuntimeSystem().changeJob(jobControl.getID(), am); - } - - /** Executed after the job reached terminal state */ - private void jobFinished(SMOAActivityStatus status) { - - try { - // If the job jumped from queued to finished state, we didn't - // add any processes yet - if (jobControl.getProcessJobRanks() == null || jobControl.getProcessJobRanks().isEmpty()) { - final BitSet bs = new BitSet(1); - bs.set(0); - final AttributeManager jam = new AttributeManager(); - jam.addAttribute(ProcessAttributes.getNodeIdAttributeDefinition().create("0")); //$NON-NLS-1$ - jobControl.addProcessesByJobRanks(bs, jam); - } - - processOutAndErr(); - - try { - out.delete(0, null); - err.delete(0, null); - if (sh != null) { - sh.delete(0, null); - } - if (machinefile != null) { - machinefile.delete(0, null); - } - } catch (final CoreException e) { - NotifyShell.open(Messages.JobThread_ErrorDeletingTempFiles_title, Messages.JobThread_ErrorDeletingTempFiles_text - + e.getLocalizedMessage()); - e.printStackTrace(); - } - } catch (final RuntimeException e) { - // Happens if there is a problem with stdout/err file - } - - final AttributeManager am = new AttributeManager(); - - String jobStatus = null; - - switch (PoolingIntervalsAndStatic.getEquivalentJobState(status)) { - case Finished: - jobStatus = Messages.JobThread_JobStateFinisedWithStatus + status.getEndStatus().getExitStatus(); - break; - case Failed: - jobStatus = Messages.JobThread_JobStateFailed; - break; - case Cancelled: - jobStatus = Messages.JobThread_JobStateCancelled; - break; - default: - jobStatus = Messages.JobThread_JobStateUnknown; - } - - am.addAttribute(JobAttributes.getStatusAttributeDefinition().create(jobStatus)); - - JobAttributes.State state; - state = JobAttributes.State.COMPLETED; - am.addAttribute(JobAttributes.getStateAttributeDefinition().create(state)); - - rm.getMonitor().getRuntimeSystem().changeJob(jobControl.getID(), am); - } - - /** - * Takes care about reading out and err streams and forwards them on console - */ - private void processOutAndErr() { - - /* Out */ - InputStream is; - try { - if (out instanceof SMOAFileStore) { - is = ((SMOAFileStore) out).openInputStream(0, null, stdOutOffset); - } else { - is = out.openInputStream(0, null); - is.skip(stdOutOffset); - } - - final byte[] buffer = new byte[512]; - - for (int count = is.read(buffer); count > 0; count = is.read(buffer)) { - stdOutOffset += count; - appendStdOut(new String(buffer).substring(0, count)); - } - } catch (final CoreException e) { - if (e.getCause() instanceof FileNotFoundException) { - return; - } - throw new RuntimeException(e); - } catch (final IOException e) { - NotifyShell.open(Messages.JobThread_ErrorOut, e.getLocalizedMessage()); - } - - /* Err */ - - try { - if (err instanceof SMOAFileStore) { - is = ((SMOAFileStore) err).openInputStream(0, null, stdErrOffset); - } else { - is = err.openInputStream(0, null); - is.skip(stdErrOffset); - } - - final byte[] buffer = new byte[512]; - - for (int count = is.read(buffer); count > 0; count = is.read(buffer)) { - stdErrOffset += count; - if (count == buffer.length) { - appendStdErr(new String(buffer)); - } else { - appendStdErr(new String(buffer).substring(0, count)); - } - } - } catch (final CoreException e) { - if (e.getCause() instanceof FileNotFoundException) { - return; - } - throw new RuntimeException(e); - } catch (final IOException e) { - NotifyShell.open(Messages.JobThread_ErrorErr, e.getLocalizedMessage()); - } - } - - /** Loop for monitoring task */ - @Override - public void run() { - try { - - SMOAActivityStatus status = factory.getActivityStatus(activityIdentifier); - - SMOAActivityStatus prevStatus = null; - - long nextStateCheck = System.currentTimeMillis(); - long nextOutCheck = nextStateCheck; - - // Till the state is not final, we pool the job and out streams - while (true) { - - // Status - if (nextStateCheck <= System.currentTimeMillis()) { - nextStateCheck = System.currentTimeMillis() + PoolingIntervalsAndStatic.getPoolingIntervalTask(); - - final SMOAJobState statusS = PoolingIntervalsAndStatic.getEquivalentJobState(status); - - SMOAJobState prevstatusS = null; - - if (prevStatus != null) { - prevstatusS = PoolingIntervalsAndStatic.getEquivalentJobState(prevStatus); - } - - if (prevStatus == null || !statusS.equals(prevstatusS)) { - switch (statusS) { - case Stage_in: - case Held: - case Queued: - changeState(JobAttributes.State.STARTING); - break; - case Stage_out: - case Executing: - changeState(JobAttributes.State.RUNNING); - break; - case Suspended: - changeState(JobAttributes.State.SUSPENDED); - break; - - case Cancelled: - break; - case Failed: - break; - case Finished: - break; - } - prevStatus = status; - } - status = factory.getActivityStatus(activityIdentifier); - - if (status.isFinalState()) { - break; - } - } - - // Out - final JobAttributes.State state = jobControl.getAttribute(JobAttributes.getStateAttributeDefinition()).getValue(); - if (state == JobAttributes.State.RUNNING && nextOutCheck <= System.currentTimeMillis()) { - nextOutCheck = System.currentTimeMillis() + PoolingIntervalsAndStatic.getPoolingIntervalOut(); - - processOutAndErr(); - } - try { - final long nextCheck = Math.min(nextOutCheck - System.currentTimeMillis(), - nextStateCheck - System.currentTimeMillis()); - if (nextCheck > 0) { - sleep(nextCheck); - } - } catch (final InterruptedException e) { - // The job has been terminated, ignore - } - } - - jobFinished(status); - - } catch (final NotAuthorizedException e) { - exceptionCaught(e); - } catch (final UnknownActivityIdentifierException e) { - exceptionCaught(e); - } - - rm.removeJobThread(jobControl.getID()); - } - - /** - * Terminates the monitored job - */ - public boolean terminate() { - try { - factory.terminateActivity(activityIdentifier); - this.interrupt(); - } catch (final Exception e) { - return false; - } - return true; - } -} |