| """Utility functions for querying CPU, memory, disk, etc.""" |
| |
| from __future__ import absolute_import |
| import os |
| |
| |
| NET_DIRECTION_RX = 0 |
| NET_DIRECTION_TX = 1 |
| |
| _NET_STAT_ROOT = '/sys/class/net/' |
| _NET_STAT_BYTES = { |
| NET_DIRECTION_RX: 'rx_bytes', |
| NET_DIRECTION_TX: 'tx_bytes' |
| } |
| |
| |
| def get_free_disk_fraction(directory=None): |
| """Returns free disk space. From 0.0 (disk is full) to 1.0 (disk is empty).""" |
| if not directory: |
| directory = os.getcwd() |
| statvfs = os.statvfs(directory) |
| return float(statvfs.f_bavail) / float(statvfs.f_blocks) |
| |
| |
| def get_network_if(): |
| """Returns list of network interfaces on host.""" |
| try: |
| return [net_if for net_if in os.listdir(_NET_STAT_ROOT) if net_if != 'lo'] |
| except OSError: |
| return [] |
| |
| |
| def get_current_directional_traffic(if_list=None, direction=NET_DIRECTION_RX): |
| """Returns current network interfaces directional statistic in bytes.""" |
| if not if_list: |
| if_list = get_network_if() |
| bytes_count = 0 |
| for net_if in if_list: |
| stat_path = os.path.join( |
| _NET_STAT_ROOT, net_if, 'statistics', _NET_STAT_BYTES[direction]) |
| try: |
| with open(stat_path) as stat: |
| bytes_count += int(stat.read()) |
| except OSError: |
| pass |
| return bytes_count |
| |
| |
| def get_current_total_traffic(if_list=None): |
| """Returns current rx, tx and total network interfaces statistic in bytes.""" |
| rx_bytes = get_current_directional_traffic(if_list, NET_DIRECTION_RX) |
| tx_bytes = get_current_directional_traffic(if_list, NET_DIRECTION_TX) |
| return rx_bytes, tx_bytes, rx_bytes + tx_bytes |