|
1 | | -% TFTP regression tests for Scapy |
| 1 | +% Regression tests for TFTP |
2 | 2 |
|
3 | 3 | # More information at http://www.secdev.org/projects/UTscapy/ |
4 | 4 |
|
| 5 | ++ TFTP coverage tests |
5 | 6 |
|
6 | | -############ |
7 | | -############ |
8 | | -+ TFTP tests |
| 7 | += Test answers |
| 8 | + |
| 9 | +assert TFTP_DATA(block=1).answers(TFTP_RRQ()) |
| 10 | +assert not TFTP_WRQ().answers(TFTP_RRQ()) |
| 11 | +assert not TFTP_RRQ().answers(TFTP_WRQ()) |
| 12 | +assert TFTP_ACK(block=1).answers(TFTP_DATA(block=1)) |
| 13 | +assert not TFTP_ACK(block=0).answers(TFTP_DATA(block=1)) |
| 14 | +assert TFTP_ACK(block=0).answers(TFTP_RRQ()) |
| 15 | +assert not TFTP_ACK().answers(TFTP_ACK()) |
| 16 | +assert TFTP_ERROR().answers(TFTP_DATA()) and TFTP_ERROR().answers(TFTP_ACK()) |
| 17 | +assert TFTP_OACK().answers(TFTP_WRQ()) |
9 | 18 |
|
10 | 19 | = TFTP Options |
| 20 | + |
11 | 21 | x=IP()/UDP(sport=12345)/TFTP()/TFTP_RRQ(filename="fname")/TFTP_Options(options=[TFTP_Option(oname="blksize", value="8192"),TFTP_Option(oname="other", value="othervalue")]) |
12 | 22 | assert raw(x) == b'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x0109\x00E\x004B6\x00\x01fname\x00octet\x00blksize\x008192\x00other\x00othervalue\x00' |
13 | 23 | y=IP(raw(x)) |
14 | 24 | y[TFTP_Option].oname |
15 | 25 | y[TFTP_Option:2].oname |
16 | 26 | assert len(y[TFTP_Options].options) == 2 and y[TFTP_Option].oname == b"blksize" |
| 27 | + |
| 28 | + |
| 29 | ++ TFTP Automatons |
| 30 | +~ linux |
| 31 | + |
| 32 | += Utilities |
| 33 | +~ linux |
| 34 | + |
| 35 | +from scapy.automaton import select_objects |
| 36 | + |
| 37 | +class MockTFTPSocket(object): |
| 38 | + packets = [] |
| 39 | + def __init__(self, iface): |
| 40 | + self.iface = iface |
| 41 | + def recv(self, n=None): |
| 42 | + pkt = self.packets.pop(0) |
| 43 | + return pkt |
| 44 | + def send(self, *args, **kargs): |
| 45 | + pass |
| 46 | + def close(self): |
| 47 | + pass |
| 48 | + @classmethod |
| 49 | + def select(classname, inputs, remain): |
| 50 | + test = [s for s in inputs if isinstance(s, classname)] |
| 51 | + if test: |
| 52 | + if len(test[0].packets): |
| 53 | + return test |
| 54 | + else: |
| 55 | + inputs = [s for s in inputs if not isinstance(s, classname)] |
| 56 | + return select_objects(inputs, remain) |
| 57 | + |
| 58 | + |
| 59 | += TFTP_read() automaton |
| 60 | +~ linux |
| 61 | + |
| 62 | +class MockReadSocket(MockTFTPSocket): |
| 63 | + packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=1) / ("P" * 512), |
| 64 | + IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=2) / "<3"] |
| 65 | + |
| 66 | +tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807, |
| 67 | + ll=MockReadSocket, |
| 68 | + recvsock=MockReadSocket, debug=5) |
| 69 | + |
| 70 | +res = tftp_read.run() |
| 71 | +assert res == (b"P" * 512 + b"<3") |
| 72 | + |
| 73 | += TFTP_read() automaton error |
| 74 | +~ linux |
| 75 | + |
| 76 | +class MockReadSocket(MockTFTPSocket): |
| 77 | + packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")] |
| 78 | + |
| 79 | +tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807, |
| 80 | + ll=MockReadSocket, |
| 81 | + recvsock=MockReadSocket) |
| 82 | + |
| 83 | +try: |
| 84 | + tftp_read.run() |
| 85 | + assert False |
| 86 | +except Automaton.ErrorState as e: |
| 87 | + assert "Reached ERROR" in str(e) |
| 88 | + assert "ERROR Access violation" in str(e) |
| 89 | + |
| 90 | + |
| 91 | += TFTP_write() automaton |
| 92 | +~ linux |
| 93 | + |
| 94 | +data_received = b"" |
| 95 | + |
| 96 | +class MockWriteSocket(MockTFTPSocket): |
| 97 | + packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=0), |
| 98 | + IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=1) ] |
| 99 | + def send(self, *args, **kargs): |
| 100 | + if len(args) and Raw in args[0]: |
| 101 | + global data_received |
| 102 | + data_received += args[0][Raw].load |
| 103 | + |
| 104 | +tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807, |
| 105 | + ll=MockWriteSocket, |
| 106 | + recvsock=MockWriteSocket) |
| 107 | + |
| 108 | +tftp_write.run() |
| 109 | +assert data_received == (b"P" * 767 + b"Scapy <3") |
| 110 | + |
| 111 | += TFTP_write() automaton error |
| 112 | +~ linux |
| 113 | + |
| 114 | +class MockWriteSocket(MockTFTPSocket): |
| 115 | + packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")] |
| 116 | + |
| 117 | +tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807, |
| 118 | + ll=MockWriteSocket, |
| 119 | + recvsock=MockWriteSocket) |
| 120 | + |
| 121 | +try: |
| 122 | + tftp_write.run() |
| 123 | + assert False |
| 124 | +except Automaton.ErrorState as e: |
| 125 | + assert "Reached ERROR" in str(e) |
| 126 | + assert "ERROR Access violation" in str(e) |
| 127 | + |
| 128 | + |
| 129 | += TFTP_WRQ_server() automaton |
| 130 | +~ linux |
| 131 | + |
| 132 | +class MockWRQSocket(MockTFTPSocket): |
| 133 | + packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt"), |
| 134 | + IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 512), |
| 135 | + IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"] |
| 136 | + |
| 137 | +tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807, |
| 138 | + ll=MockWRQSocket, |
| 139 | + recvsock=MockWRQSocket) |
| 140 | +assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 512 + b"<3")) |
| 141 | + |
| 142 | += TFTP_WRQ_server() automaton with options |
| 143 | +~ linux |
| 144 | + |
| 145 | +class MockWRQSocket(MockTFTPSocket): |
| 146 | + packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]), |
| 147 | + IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 100), |
| 148 | + IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"] |
| 149 | + |
| 150 | +tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807, |
| 151 | + ll=MockWRQSocket, |
| 152 | + recvsock=MockWRQSocket) |
| 153 | +assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 100 + b"<3")) |
| 154 | + |
| 155 | += TFTP_RRQ_server() automaton |
| 156 | +~ linux |
| 157 | + |
| 158 | +sent_data = "P" * 512 + "<3" |
| 159 | +import tempfile |
| 160 | +filename = tempfile.mktemp(suffix=".txt") |
| 161 | +fdesc = open(filename, "w") |
| 162 | +fdesc.write(sent_data) |
| 163 | +fdesc.close() |
| 164 | + |
| 165 | +received_data = "" |
| 166 | + |
| 167 | +class MockRRQSocket(MockTFTPSocket): |
| 168 | + packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]), |
| 169 | + IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename=filename[5:]) / TFTP_Options(), |
| 170 | + IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=1), |
| 171 | + IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=2) ] |
| 172 | + def send(self, *args, **kargs): |
| 173 | + if len(args): |
| 174 | + pkt = args[0] |
| 175 | + if TFTP_DATA in pkt: |
| 176 | + global received_data |
| 177 | + received_data += pkt[Raw].load.decode("utf-8") |
| 178 | + |
| 179 | +tftp_rrq = TFTP_RRQ_server(ip="1.2.3.4", sport=0x2807, dir="/tmp/", serve_one=True, |
| 180 | + ll=MockRRQSocket, |
| 181 | + recvsock=MockRRQSocket, debug=4) |
| 182 | +tftp_rrq.run() |
| 183 | +assert received_data == sent_data |
| 184 | + |
| 185 | +import os |
| 186 | +os.unlink(filename) |
0 commit comments