Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraleherbau2011-05-12 05:05:20 -0400
committeraleherbau2011-05-12 05:05:20 -0400
commitfa85be44ccd955603e514bc2ed7a107e1f56f8f1 (patch)
treebc3b39b96184c89d4ef1f744107c673671bea7b3
parentcd93aae3bec36af01c72b4433c8a4c647560a8de (diff)
downloadorg.eclipse.tcf-fa85be44ccd955603e514bc2ed7a107e1f56f8f1.tar.gz
org.eclipse.tcf-fa85be44ccd955603e514bc2ed7a107e1f56f8f1.tar.xz
org.eclipse.tcf-fa85be44ccd955603e514bc2ed7a107e1f56f8f1.zip
TCF Python: Fixed frequent 10054 error messages on windows, code cleanup
-rw-r--r--python/src/tcf/EventQueue.py10
-rw-r--r--python/src/tcf/services/local/LocatorService.py40
-rw-r--r--python/src/tcf/shell.py5
-rw-r--r--python/todo.twiki2
4 files changed, 31 insertions, 26 deletions
diff --git a/python/src/tcf/EventQueue.py b/python/src/tcf/EventQueue.py
index 1c8a3e90c..546991b8f 100644
--- a/python/src/tcf/EventQueue.py
+++ b/python/src/tcf/EventQueue.py
@@ -9,7 +9,7 @@
# * Wind River Systems - initial API and implementation
# *******************************************************************************
-import threading, exceptions
+import threading
import protocol
class EventQueue(object):
@@ -33,7 +33,7 @@ class EventQueue(object):
self.__is_waiting = False
self.__lock.notifyAll()
self.__thread.join()
- except exceptions.Exception as e:
+ except BaseException as e:
protocol.log("Failed to shutdown TCF event dispatch thread", e)
def isShutdown(self):
@@ -53,13 +53,13 @@ class EventQueue(object):
self.__lock.wait()
r, args, kwargs = self.__queue.pop(0)
r(*args, **kwargs)
- except exceptions.Exception as x:
+ except BaseException as x:
self.__error(x)
def invokeLater(self, r, *args, **kwargs):
- assert r;
+ assert r
with self.__lock:
- if self.__is_shutdown: raise exceptions.RuntimeError("TCF event dispatcher has shut down")
+ if self.__is_shutdown: raise RuntimeError("TCF event dispatcher has shut down")
self.__queue.append((r, args, kwargs))
if self.__is_waiting:
self.__is_waiting = False
diff --git a/python/src/tcf/services/local/LocatorService.py b/python/src/tcf/services/local/LocatorService.py
index 57ff41679..aa5550e1e 100644
--- a/python/src/tcf/services/local/LocatorService.py
+++ b/python/src/tcf/services/local/LocatorService.py
@@ -15,7 +15,7 @@ for peers and to collect and maintain up-to-date
data about peer's attributes.
"""
-import threading, time, exceptions, socket, cStringIO
+import threading, time, socket, cStringIO
from tcf.services import locator
from tcf.util import logging
from tcf.channel import fromJSONSequence, toJSONSequence
@@ -150,7 +150,7 @@ class LocatorService(locator.LocatorService):
except RuntimeError:
# TCF event dispatch is shut down
return
- except exceptions.Exception as x:
+ except Exception as x:
service._log("Unhandled exception in TCF discovery timer thread", x)
self.timer_thread = TimerThread(self.__refresh_timer)
class DNSLookupThread(threading.Thread):
@@ -161,7 +161,7 @@ class LocatorService(locator.LocatorService):
with LocatorService._addr_cache_lock:
if not LocatorService.addr_request:
LocatorService._addr_cache_lock.wait(locator.DATA_RETENTION_PERIOD)
- msec = int(time.time())
+ msec = int(time.time() * 1000)
for a in LocatorService.addr_cache.values():
if a.time_stamp + locator.DATA_RETENTION_PERIOD * 10 < msec:
if a.used:
@@ -184,7 +184,7 @@ class LocatorService(locator.LocatorService):
a.address = InetAddress(a.host, addr)
a.time_stamp = time
a.used = False
- except exceptions.BaseException as x:
+ except BaseException as x:
service._log("Unhandled exception in TCF discovery DNS lookup thread", x)
self.dns_lookup_thread = DNSLookupThread()
class InputThread(threading.Thread):
@@ -204,10 +204,12 @@ class LocatorService(locator.LocatorService):
return
except socket.error as x:
if sock != service.socket: continue
+ # frequent error on windows, unknown reason
+ if x.errno == 10054: continue
port = sock.getsockname()[1]
service._log("Cannot read from datagram socket at port %d" % port, x)
time.sleep(2)
- except exceptions.BaseException as x:
+ except BaseException as x:
service._log("Unhandled exception in socket reading thread", x)
self.input_thread = InputThread(self.__handleDatagramPacket)
try:
@@ -240,8 +242,8 @@ class LocatorService(locator.LocatorService):
self.listeners.append(LocatorListener())
self.__refreshSubNetList()
self.__sendPeersRequest(None, 0)
- self.__sendAll(None, 0, None, int(time.time()))
- except exceptions.Exception as x:
+ self.__sendAll(None, 0, None, int(time.time() * 1000))
+ except Exception as x:
self._log("Cannot open UDP socket for TCF discovery protocol", x)
@classmethod
@@ -254,7 +256,7 @@ class LocatorService(locator.LocatorService):
def __makeErrorReport(self, code, msg):
err = {}
- err[errors.ERROR_TIME] = int(time.time())
+ err[errors.ERROR_TIME] = int(time.time() * 1000)
err[errors.ERROR_CODE] = code
err[errors.ERROR_FORMAT] = msg
return err
@@ -282,7 +284,7 @@ class LocatorService(locator.LocatorService):
channel.sendResult(token, toJSONSequence((None, arr)))
else:
channel.rejectCommand(token)
- except exceptions.BaseException as x:
+ except BaseException as x:
channel.terminate(x)
def _log(self, msg, x):
@@ -305,7 +307,7 @@ class LocatorService(locator.LocatorService):
i.address = InetAddress(host, addr)
except socket.gaierror:
pass
- i.time_stamp = int(time.time())
+ i.time_stamp = int(time.time() * 1000)
else:
# socket.gethostbyname() can cause long delay - delegate to background thread
LocatorService.addr_request = True
@@ -315,7 +317,7 @@ class LocatorService(locator.LocatorService):
return i.address
def __refresh_timer(self):
- tm = int(time.time())
+ tm = int(time.time() * 1000)
# Cleanup slave table
if self.slaves:
i = 0
@@ -373,7 +375,7 @@ class LocatorService(locator.LocatorService):
subNetSet = set()
try:
self.__getSubNetList(subNetSet)
- except exceptions.BaseException as x:
+ except BaseException as x:
self._log("Cannot get list of network interfaces", x)
for s in tuple(self.subnets):
if s in subNetSet: continue
@@ -426,7 +428,7 @@ class LocatorService(locator.LocatorService):
if self.out_buf[4] == locator.CONF_PEER_INFO:
map = self.__parsePeerAttributes(self.out_buf, 8)
self.__traceDiscoveryPacket(False, self.packetTypes[self.out_buf[4]], map, addr, port)
- except exceptions.BaseException as x:
+ except BaseException as x:
self._log("Cannot send datagram packet to %s" % addr, x)
return False
return True
@@ -557,7 +559,7 @@ class LocatorService(locator.LocatorService):
def __handleDatagramPacket(self, p):
try:
- tm = int(time.time())
+ tm = int(time.time() * 1000)
buf = p.getData()
len = p.getLength()
if len < 8: return
@@ -590,7 +592,7 @@ class LocatorService(locator.LocatorService):
subnet.last_slaves_req_time = tm
if subnet.address == remote_address and remote_port == DISCOVEY_PORT:
self.last_master_packet_time = tm
- except exceptions.BaseException as x:
+ except BaseException as x:
self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
def __handlePeerInfoPacket(self, p):
@@ -598,7 +600,7 @@ class LocatorService(locator.LocatorService):
map = self.__parsePeerAttributes(p.getData(), p.getLength())
if __TRACE_DISCOVERY__: self.__traceDiscoveryPacket(True, "CONF_PEER_INFO", map, p)
id = map.get(peer.ATTR_ID)
- if id is None: raise exceptions.RuntimeError("Invalid peer info: no ID")
+ if id is None: raise RuntimeError("Invalid peer info: no ID")
ok = True
host = map.get(peer.ATTR_IP_HOST)
if host is not None:
@@ -615,7 +617,7 @@ class LocatorService(locator.LocatorService):
_peer.updateAttributes(map)
elif _peer is None:
peer.RemotePeer(map)
- except exceptions.BaseException as x:
+ except BaseException as x:
self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
def __handleReqInfoPacket(self, p, sl, tm):
@@ -672,12 +674,12 @@ class LocatorService(locator.LocatorService):
if time_val < time_now - delta or time_val > time_now + delta:
msg = "Invalid slave info timestamp: %s -> %s" % (
timestamp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_val/1000.)))
- self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), exceptions.Exception(msg))
+ self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), Exception(msg))
time_val = time_now - locator.DATA_RETENTION_PERIOD / 2
self.__addSlave(addr, port, time_val, time_now)
if __TRACE_DISCOVERY__:
self.__traceDiscoveryPacket(True, "CONF_SLAVES_INFO", trace_map, p)
- except exceptions.BaseException as x:
+ except BaseException as x:
self._log("Invalid datagram packet received from %s/%s" % (p.getAddress(), p.getPort()), x)
def __handleReqSlavesPacket(self, p, sl, tm):
diff --git a/python/src/tcf/shell.py b/python/src/tcf/shell.py
index dacd3d3fe..7e0f8c54f 100644
--- a/python/src/tcf/shell.py
+++ b/python/src/tcf/shell.py
@@ -34,6 +34,8 @@ from tcf.util import sync, event
from tcf import protocol, channel
class print_peers:
+ def __call__(self):
+ return tcf.peers()
def __repr__(self):
peers = tcf.peers()
return '\n'.join(peers.keys())
@@ -53,6 +55,7 @@ class Shell(code.InteractiveConsole, protocol.ChannelOpenListener, channel.Chann
super(Shell, self).interact(banner)
finally:
protocol.invokeLater(protocol.removeChannelOpenListener, self)
+ protocol.getEventQueue().shutdown()
def onChannelOpen(self, channel):
wrapper = sync.DispatchWrapper(channel)
self.locals["channel"] = wrapper
@@ -71,7 +74,7 @@ class Shell(code.InteractiveConsole, protocol.ChannelOpenListener, channel.Chann
def interact():
try:
# enable commandline editing if available
- import readline
+ import readline #@UnusedImport
except ImportError:
pass
shell = Shell()
diff --git a/python/todo.twiki b/python/todo.twiki
index 602325a87..028a48279 100644
--- a/python/todo.twiki
+++ b/python/todo.twiki
@@ -28,7 +28,7 @@
* SysMonitor
* Local services:
* [incomplete] ZeroCopy
- * [started] Locator service (discovery)
+ * [done] Locator service (discovery)
* Utilities:
* [done] ACPM data cache
* [done] TCFTask

Back to the top