Skip to content

Instantly share code, notes, and snippets.

@kunalb
Created September 9, 2023 17:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kunalb/ac090dfd5c17049406aafe3faba83454 to your computer and use it in GitHub Desktop.
Save kunalb/ac090dfd5c17049406aafe3faba83454 to your computer and use it in GitHub Desktop.
Capture stderr
import ctypes
import os
import threading
class StderrCapture:
def __init__(self, f):
self.libc = ctypes.CDLL('libc.so.6')
self.memfd_create = self.libc.memfd_create
self.memfd_create.argtypes = [ctypes.c_char_p, ctypes.c_uint]
self.memfd_create.restype = ctypes.c_int
self.f = f
self.fd = None
def _create_and_dup_fd(self, name):
fd = self.memfd_create(name, 0)
os.dup2(fd, 2)
return fd
def _read_and_flush(self, fd):
while True:
new_fd = self._create_and_dup_fd(b"new_temp_file")
os.lseek(fd, 0, os.SEEK_SET)
data = os.read(fd, 1024).decode('utf-8')
if data:
self.f(data)
os.close(fd)
fd = new_fd
def __enter__(self):
self.fd = self._create_and_dup_fd(b"initial_temp_file")
self.thread = threading.Thread(target=self._read_and_flush, args=(self.fd,))
self.thread.daemon = True
self.thread.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.thread.join()
if self.fd is not None:
os.close(self.fd)
# Custom function to handle captured data
def custom_function(data):
# Your code here
pass
# Usage
with StderrCapture(custom_function) as capture:
# Your code that produces output to stderr can go here
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment