blob: 46031faedba1af02f9d01ee67989008244beef44 [file] [log] [blame]
# Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
# for details. All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
# This file contains a set of utilities for parsing minidumps.
import ctypes
import mmap
import os
import sys
class Enum(object):
def __init__(self, type, name2value):
self.name2value = name2value
self.value2name = {v: k for k, v in name2value.items()}
self.type = type
def from_raw(self, v):
if v not in self.value2name:
return 'Unknown(' + str(v) + ')'
return self.value2name[v]
def to_raw(self, v):
return self.name2value[v]
class Descriptor(object):
"""A handy wrapper over ctypes.Structure"""
def __init__(self, fields):
self.fields = fields
self.ctype = Descriptor._GetCtype(fields)
self.size = ctypes.sizeof(self.ctype)
def Read(self, address):
return self.ctype.from_address(address)
@staticmethod
def _GetCtype(fields):
raw_fields = []
wrappers = {}
for field in fields:
(name, type) = field
if isinstance(type, Enum):
raw_fields.append(('_raw_' + name, type.type))
wrappers[name] = type
else:
raw_fields.append(field)
class Raw(ctypes.Structure):
_fields_ = raw_fields
_pack_ = 1
def __getattribute__(self, name):
if name in wrappers:
return wrappers[name].from_raw(
getattr(self, '_raw_' + name))
else:
return ctypes.Structure.__getattribute__(self, name)
def __repr__(self):
return '{' + ', '.join(
'%s: %s' % (field, self.__getattribute__(field))
for field, _ in fields) + '}'
return Raw
# Structures below are based on the information in the MSDN pages and
# Breakpad/Crashpad sources.
MINIDUMP_HEADER = Descriptor([('signature', ctypes.c_uint32),
('version', ctypes.c_uint32),
('stream_count', ctypes.c_uint32),
('stream_directories_rva', ctypes.c_uint32),
('checksum', ctypes.c_uint32),
('time_date_stampt', ctypes.c_uint32),
('flags', ctypes.c_uint64)])
MINIDUMP_LOCATION_DESCRIPTOR = Descriptor([('data_size', ctypes.c_uint32),
('rva', ctypes.c_uint32)])
MINIDUMP_STREAM_TYPE = {
'MD_UNUSED_STREAM': 0,
'MD_RESERVED_STREAM_0': 1,
'MD_RESERVED_STREAM_1': 2,
'MD_THREAD_LIST_STREAM': 3,
'MD_MODULE_LIST_STREAM': 4,
'MD_MEMORY_LIST_STREAM': 5,
'MD_EXCEPTION_STREAM': 6,
'MD_SYSTEM_INFO_STREAM': 7,
'MD_THREAD_EX_LIST_STREAM': 8,
'MD_MEMORY_64_LIST_STREAM': 9,
'MD_COMMENT_STREAM_A': 10,
'MD_COMMENT_STREAM_W': 11,
'MD_HANDLE_DATA_STREAM': 12,
'MD_FUNCTION_TABLE_STREAM': 13,
'MD_UNLOADED_MODULE_LIST_STREAM': 14,
'MD_MISC_INFO_STREAM': 15,
'MD_MEMORY_INFO_LIST_STREAM': 16,
'MD_THREAD_INFO_LIST_STREAM': 17,
'MD_HANDLE_OPERATION_LIST_STREAM': 18,
}
MINIDUMP_DIRECTORY = Descriptor([('stream_type',
Enum(ctypes.c_uint32, MINIDUMP_STREAM_TYPE)),
('location',
MINIDUMP_LOCATION_DESCRIPTOR.ctype)])
MINIDUMP_MISC_INFO_2 = Descriptor([
('SizeOfInfo', ctypes.c_uint32),
('Flags1', ctypes.c_uint32),
('ProcessId', ctypes.c_uint32),
('ProcessCreateTime', ctypes.c_uint32),
('ProcessUserTime', ctypes.c_uint32),
('ProcessKernelTime', ctypes.c_uint32),
('ProcessorMaxMhz', ctypes.c_uint32),
('ProcessorCurrentMhz', ctypes.c_uint32),
('ProcessorMhzLimit', ctypes.c_uint32),
('ProcessorMaxIdleState', ctypes.c_uint32),
('ProcessorCurrentIdleState', ctypes.c_uint32),
])
MINIDUMP_MISC1_PROCESS_ID = 0x00000001
# A helper to get a raw address of the memory mapped buffer returned by
# mmap.
def BufferToAddress(buf):
obj = ctypes.py_object(buf)
address = ctypes.c_void_p()
length = ctypes.c_ssize_t()
ctypes.pythonapi.PyObject_AsReadBuffer(obj, ctypes.byref(address),
ctypes.byref(length))
return address.value
class MinidumpFile(object):
"""Class for reading minidump files."""
_HEADER_MAGIC = 0x504d444d
def __init__(self, minidump_name):
self.minidump_name = minidump_name
self.minidump_file = open(minidump_name, 'r')
self.minidump = mmap.mmap(
self.minidump_file.fileno(), 0, access=mmap.ACCESS_READ)
self.minidump_address = BufferToAddress(self.minidump)
self.header = self.Read(MINIDUMP_HEADER, 0)
if self.header.signature != MinidumpFile._HEADER_MAGIC:
raise Exception('Unsupported minidump header magic')
self.directories = []
offset = self.header.stream_directories_rva
for _ in range(self.header.stream_count):
self.directories.append(self.Read(MINIDUMP_DIRECTORY, offset))
offset += MINIDUMP_DIRECTORY.size
def GetProcessId(self):
for dir in self.directories:
if dir.stream_type == 'MD_MISC_INFO_STREAM':
info = self.Read(MINIDUMP_MISC_INFO_2, dir.location.rva)
if info.Flags1 & MINIDUMP_MISC1_PROCESS_ID != 0:
return info.ProcessId
return -1
def Read(self, what, offset):
return what.Read(self.minidump_address + offset)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.minidump.close()
self.minidump_file.close()
# Returns process id of the crashed process recorded in the given minidump.
def GetProcessIdFromDump(path):
try:
with MinidumpFile(path) as f:
return int(f.GetProcessId())
except:
return -1