|
| 1 | +from fuzzeddataprovider import FuzzedDataProvider |
| 2 | +import codecs |
| 3 | +import io |
| 4 | + |
| 5 | +DECODERS = [ |
| 6 | + "utf-7", "shift_jis", "euc-jp", "gb2312", "big5", "iso-2022-jp", |
| 7 | + "euc-kr", "gb18030", "big5hkscs", "charmap", "ascii", "latin-1", |
| 8 | + "cp1252", "unicode_escape", "raw_unicode_escape", "utf-16", "utf-32", |
| 9 | +] |
| 10 | + |
| 11 | +ENCODERS = [ |
| 12 | + "shift_jis", "euc-jp", "gb2312", "big5", "iso-2022-jp", "euc-kr", |
| 13 | + "gb18030", "big5hkscs", "unicode_escape", "raw_unicode_escape", |
| 14 | + "utf-7", "utf-8", "utf-16", "utf-16-le", "utf-16-be", "utf-32", |
| 15 | + "latin-1", "ascii", "charmap", |
| 16 | +] |
| 17 | + |
| 18 | +INC_DEC_CODECS = ["shift_jis", "gb18030", "utf-16"] |
| 19 | +INC_ENC_CODECS = ["shift_jis", "utf-8"] |
| 20 | + |
| 21 | +OP_DECODE = 0 |
| 22 | +OP_ENCODE = 1 |
| 23 | +OP_INCREMENTAL_DECODE = 2 |
| 24 | +OP_INCREMENTAL_ENCODE = 3 |
| 25 | +OP_STREAM_READ = 4 |
| 26 | + |
| 27 | +def op_decode(fdp): |
| 28 | + codec = fdp.PickValueInList(DECODERS) |
| 29 | + data = fdp.ConsumeBytes(fdp.remaining_bytes()) |
| 30 | + codecs.decode(data, codec, 'replace') |
| 31 | + |
| 32 | +def op_encode(fdp): |
| 33 | + codec = fdp.PickValueInList(ENCODERS) |
| 34 | + n = fdp.ConsumeIntInRange(1, min(fdp.remaining_bytes(), 10000)) if fdp.remaining_bytes() > 0 else 0 |
| 35 | + if n == 0: |
| 36 | + return |
| 37 | + s = fdp.ConsumeUnicode(n) |
| 38 | + codecs.encode(s, codec, 'replace') |
| 39 | + |
| 40 | +def op_incremental_decode(fdp): |
| 41 | + codec = fdp.PickValueInList(INC_DEC_CODECS) |
| 42 | + chunk1_size = fdp.ConsumeIntInRange(0, 10000) |
| 43 | + chunk1 = fdp.ConsumeBytes(chunk1_size) |
| 44 | + chunk2 = fdp.ConsumeBytes(fdp.remaining_bytes()) |
| 45 | + decoder = codecs.getincrementaldecoder(codec)('replace') |
| 46 | + decoder.decode(chunk1) |
| 47 | + decoder.decode(chunk2, True) |
| 48 | + decoder.getstate() |
| 49 | + decoder.reset() |
| 50 | + |
| 51 | +def op_incremental_encode(fdp): |
| 52 | + codec = fdp.PickValueInList(INC_ENC_CODECS) |
| 53 | + n = fdp.ConsumeIntInRange(1, min(fdp.remaining_bytes(), 10000)) if fdp.remaining_bytes() > 0 else 0 |
| 54 | + if n == 0: |
| 55 | + return |
| 56 | + s = fdp.ConsumeUnicode(n) |
| 57 | + split = fdp.ConsumeIntInRange(0, len(s)) |
| 58 | + encoder = codecs.getincrementalencoder(codec)('replace') |
| 59 | + encoder.encode(s[:split]) |
| 60 | + encoder.reset() |
| 61 | + encoder.encode(s[split:]) |
| 62 | + encoder.getstate() |
| 63 | + |
| 64 | +def op_stream(fdp): |
| 65 | + data = fdp.ConsumeBytes(fdp.remaining_bytes()) |
| 66 | + bio = io.BytesIO(data) |
| 67 | + reader = codecs.getreader('utf-8')(bio, 'replace') |
| 68 | + reader.read() |
| 69 | + |
| 70 | +# Fuzzes CPython's codec infrastructure (Modules/cjkcodecs/, Python/codecs.c). |
| 71 | +# Exercises full and incremental encode/decode for CJK codecs (Shift-JIS, |
| 72 | +# EUC-JP, GB2312, Big5, ISO-2022-JP, EUC-KR, GB18030, Big5-HKSCS) and |
| 73 | +# Western/Unicode codecs (UTF-7/16/32, charmap, unicode_escape, latin-1). |
| 74 | +# Also tests stream-based reading via codecs.getreader(). |
| 75 | +def FuzzerRunOne(FuzzerInput): |
| 76 | + if len(FuzzerInput) < 1 or len(FuzzerInput) > 0x100000: |
| 77 | + return |
| 78 | + fdp = FuzzedDataProvider(FuzzerInput) |
| 79 | + op = fdp.ConsumeIntInRange(OP_DECODE, OP_STREAM_READ) |
| 80 | + try: |
| 81 | + if op == OP_DECODE: |
| 82 | + op_decode(fdp) |
| 83 | + elif op == OP_ENCODE: |
| 84 | + op_encode(fdp) |
| 85 | + elif op == OP_INCREMENTAL_DECODE: |
| 86 | + op_incremental_decode(fdp) |
| 87 | + elif op == OP_INCREMENTAL_ENCODE: |
| 88 | + op_incremental_encode(fdp) |
| 89 | + else: |
| 90 | + op_stream(fdp) |
| 91 | + except Exception: |
| 92 | + pass |
0 commit comments