-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsocketmon.py
executable file
·165 lines (146 loc) · 5.11 KB
/
socketmon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import sys
import time
import socket
sys.path.append("..")
sys.path.append("../..")
from Peach.agent import Monitor
from twisted.internet import reactor, protocol
from threading import Thread
g_socketData = None
g_faultDetected = False
g_stopReactor = False
# TODO: Port this over to use ThreadedSelectReactor
# http://twistedmatrix.com/documents/current/api/twisted.internet._threadedselect.ThreadedSelectReactor.html
class SocketMonitor(Monitor):
def __init__(self, args):
"""
Constructor. Arguments are supplied via the Peach XML file.
@type args: Dictionary
@param args: Dictionary of parameters
"""
try:
self._name = "Socket Monitor"
self._ip = ""
self._port = ""
self._protocol = ""
self._thread = None
self._internalError = False
self._stopOnFault = False
# Report an error if no MemoryLimit and/or neither pid nor
# processName is defined.
while 1:
if args.has_key('IP'):
self._ip = str(args["IP"]).replace("'''", "")
else:
print("Socket Monitor: No IP specified, using using "
"127.0.0.1")
self._ip = "127.0.0.1"
if args.has_key('Port'):
self._port = int(args['Port'].replace("'''", ""))
print("Socket Monitor: Listening on Port: {}"
.format(self._port))
else:
print("Socket Monitor: No Port specified, using 80")
self._port = "8002"
if args.has_key('Protocol'):
self._protocol = args['Protocol'].replace("'''", "")
print("Socket Monitor: Protocol = %s" % self._protocol)
else:
print("Socket Monitor: No Protocol specified, using TCP")
self._protocol = "tcp"
break
except:
self._internalError = True
print("Socket Monitor: Caught Exception Parsing Arguments")
raise
def OnTestStarting(self):
"""
Called right before start of test case or variation.
"""
global g_faultDetected
global g_socketData
# fire up twisted if it hasn't been already
if self._thread is None:
self._thread = ReactorThread(self._ip, self._port, self._protocol)
self._thread.start()
g_socketData = None
g_faultDetected = False
return
def GetMonitorData(self):
"""
Get any monitored data from a test case.
"""
global g_socketData
return {'SocketData.txt': str(g_socketData)}
def DetectedFault(self):
"""
Check if a fault was detected.
"""
global g_faultDetected
return g_faultDetected
def OnShutdown(self):
"""
Called when Agent is shutting down, typically at end of a test run or
when a Stop-Run occurs.
"""
global g_stopReactor
g_stopReactor = True
s = None
# hack to stop reactor
if self._protocol.lower() == "tcp":
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((self._ip, int(self._port)))
s.close()
def StopRun(self):
"""
Return True to force test run to fail. This should return True if an
unrecoverable error occurs.
"""
return self._internalError
class ReactorThread(Thread):
def __init__(self, ip, port, protocol):
Thread.__init__(self)
self._ip = ip
self._port = port
self._protocol = protocol
self._factory = None
def run(self):
self._factory = protocol.ServerFactory()
self._factory.protocol = Listener
if self._protocol.lower() == "tcp":
reactor.listenTCP(int(self._port), self._factory)
else:
reactor.listenUDP(int(self._port), self._factory)
reactor.run(installSignalHandlers=0)
class Listener(protocol.Protocol):
def connectionMade(self):
global g_stopReactor
global g_socketData
# hack until ThreadedSelectReactor is implemented
if g_stopReactor:
print("Socket Monitor: Stopping Reactor")
reactor.stop()
else:
host = self.transport.getHost()
g_socketData = host.host + ":" + str(host.port)
global g_faultDetected
g_faultDetected = True
if __name__ == "__main__":
d = {
"IP": "127.0.0.1",
"Port": "8002",
"Protocol": "tcp"
}
a = SocketMonitor(d)
a.OnTestStarting()
while True:
a.OnTestStarting()
time.sleep(2)
print(a.DetectedFault())
a.OnTestFinished()
a.OnShutdown()