Close
Community Post

Custom integration of the Thread Manager to support a third party Python library

Running Depth Anything V2 in TouchDesigner with support of the new Thread Manager and Python Environment Manager

! Before getting started with this reading, we invite you to explore the following article: Introducing the TouchDesigner Python Environment Manager (TDPyEnvManager) and read the related documentation.

This article will rely on using the TDPyEnvManager.

Create an empty folder and name it TDDepthAnything.

Create a new .toe file at the root of that new folder, and name it TDDepthAnything.toe.

Remove any content in /project1.

From the palette, drag and drop:

  • TDPyEnvManager (from the palette Tools folder)
  • Logger (from the palette Tools folder)
  • ThreadsMonitor (from the palette ThreadManager folder)

Setting up our environment

On the TDPyEnvManager, toggle on the Active parameter, click Continue in the dialog, and click the pulse parameter Create vEnv.

When you see the Status parameter stating “Environment linked and ready.”, we are good to go.

Click on Open CLI, you should have a new terminal window opening, with the TDDepthAnything vEnv activated.

We’ll install first PyTorch with GPU support, on Windows, and the CPU* library on MacOS.

*Note: Our MacOS example will rely on MPS to run the AI model, the Torch backend for Metal, which will be accelerated.

Head to this handy PyTorch tool: https://pytorch.org/get-started/locally/

Windows users will pick CUDA 12.8.0 which is the version we are also using in TouchDesigner 2025.30000+.

You should get something like: pip install torch torchvision torchaudio --index-url <https://download.pytorch.org/whl/cu128>

MacOS users should get something along these lines: pip install torch torchvision torchaudio

This is going to take a little while to install, get a cup of coffee, or your favorite potion.

Next, we will install Hugging Face’s Transformers library: pip install transformers

Once you are done with this step, you can close the CLI.

Creating a TDDepthAnything COMP

We are going to create a new component from scratch.

It’s going to involve a bit of Python that I’ll share snippets of to copy paste.

That being said, you can also download the full component from Github: https://github.com/TouchDesigner/TDDepthAnything

First, we’ll start by setting up our COMP:

  • Create a new Base COMP
  • Name that Base COMP TDDepthAnything
  • In TDDepthAnything COMP, add a Movie File In TOP
  • Connect the Movie File In TOP to the second input of a new In TOP
  • Connect the In TOP to a Null TOP
  • Rename the Null TOP to inputImage
  • Connect the inputImage Null TOP to a Script TOP
  • Connect the Script TOP to an Out TOP
  • Go up a level in /project1 and grab the logger COMP we drag n dropped from the Palette earlier, move it inside the TDDepthAnything COMP, it should now be at /project1/TDDepthAnything/logger

The idea here is that we’ll run the model on the input frames coming through to inputImage and dump the result of the model inference into the Script TOP.

We'll design an extension to run all this.

The main features we need to cover:

  • Download the model
  • Upload the model to the GPU
  • Trigger inference on the incoming image
  • Bonus: Unload the model, run inference continuously

Ok, let’s start with downloading the model.

Go up a level in your network.

Right click on the TDDepthAnything COMP, and click on customize component.

Go in the Extensions section and click on the arrow.

Type TDDepthAnythingExt next to Extension 1.

Add the extension.

While we are here, we are going to add a few custom parameters.

Create a page named TD Depth Anything, add:

  • A header par named Depth Anything
  • A pulse par named Load Model
  • A pulse par named Unload Model
  • A toggle par named Running Inference
  • A pulse par named Trigger Inference
  • A pulse par named Reset

 

Sweet.

Let’s go in the TDDepthAnything COMP and setup a couple more things before editing our Extension.

We want to relay some events happening on the parameters to our extension.

Let’s add a Parameter Execute DAT, and turn on the Value Change and On Pulse toggles.

In the DAT, we’ll write some generic code to forward the events to our extension:

def onValueChange(par, prev):
	if hasattr(parent(), 'OnValueChange'+par.name):
		getattr(parent(), 'OnValueChange'+par.name)(par, prev)
	return
 
def onPulse(par):
	if hasattr(parent(), 'OnPulse'+par.name):
		getattr(parent(), 'OnPulse'+par.name)(par)	
	return

Now, let’s move to our extension. You will find it in the TDDepthAnything COMP, at TDDepthAnything/TDDepthAnythingExt.

We will first setup a few attributes that we will want to use, and the methods related to downloading our model.

Cleanup the extension from any code so that it looks just like that:

from TDStoreTools import StorageManager
import TDFunctions as TDF
 
class TDDepthAnythingExt:
	"""
	TDDepthAnythingExt description
	"""
	def __init__(self, ownerComp):
		# The component to which this extension is attached
		self.ownerComp = ownerComp

Following that last line, we will add a few promoted attributes for easy access across our code.

		# Utilities
		self.Logger = self.ownerComp.op('logger')
		self.SafeLogger = self.Logger.Logger
		self.ThreadManager = op.TDResources.ThreadManager
		self.InputImage = self.ownerComp.op('inputImage')
		self.ScriptBuffer = self.ownerComp.op('script1')

We are going to interact with objects across threads, so we will want to add a lock to make sure that we are safely interacting with the said objects. Locks will prevent us from dealing with eventual race conditions across threads.

Add a line bellow self.ScriptBuffer:

                self.DepthAnythingLock = threading.Lock()

We will also add attributes where to find our image processor and model:

		self.ImageProc = None
		self.Model = None

Finally, we’ll need somewhere where to store the inference result to manipulate before writing to the script TOP buffer:

		self.NpDepth = np.random.randint(0, high=255, size=(720, 1280, 4), dtype='uint16')
		self.ScriptBuffer.copyNumpyArray(self.NpDepth)

In the above snippet, we are creating a random NumPy array to fill our Script TOP buffer with.

Last, we’ll add an attribute to prevent inference from running when an inference task is already happening, and an attribute that will tell us where to run the model based on your system spec:

		self.IsReady = False # Using IsReady to prevent inference when model is already running.
		self.Device ='cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'

Your final __init__ method should look like:

	def __init__(self, ownerComp):
		# The component to which this extension is attached
		self.ownerComp = ownerComp
 
		# Utilities
		self.Logger = self.ownerComp.op('logger')
		self.SafeLogger = self.Logger.Logger
		self.ThreadManager = op.TDResources.ThreadManager
		self.InputImage = self.ownerComp.op('inputImage')
		self.ScriptBuffer = self.ownerComp.op('script1')
 
		self.DepthAnythingLock = threading.Lock()
 
		# ML Generics
		self.Model = None
		self.ImageProc = None
 
		self.NpDepth = np.random.randint(0, high=255, size=(720, 1280, 4), dtype='uint16')
		self.ScriptBuffer.copyNumpyArray(self.NpDepth)
 
		self.IsReady = False # Using IsReady to prevent inference when model is already running.
 
		self.Device ='cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'
 
		self.Logger.Debug('TDDepthAnything init done.', self)

Ok, we are pretty much set to start and implement our model download methods.

We’ll start from the origin, which is the method that triggers the download of the model and upload to the device on which the model will run.

If you look at the Parameter Execute DAT we implemented earlier, as well as the parameters, you can see where we are going to use: OnPulseLoadmodel.

It might seem a bit redundant but I like to keep those parameter bound methods to the minimum.

	def OnPulseLoadmodel(self, par):
		self.LoadModelThreaded()
		self.Logger.Debug('Loading Model Triggered')

I like to name my threaded methods with the name of the action, in our case here LoadModel followed by Threaded. We will implement the blocking method as LoadModel, and LoadModelSucces, LoadModelExcept, and last LoadModelRefresh.

The model we will use for that example is the small checkpoint version, Depth-Anything-V2-Small-hf: https://huggingface.co/depth-anything/Depth-Anything-V2-Small-hf

We will cache the model in a checkpoints folder at the root of our project folder.

	def LoadModelThreaded(self):
		modelName = 'Depth-Anything-V2-Small-hf'
		modelPath = f'depth-anything/{modelName}'
		checkpointsCache = f'{project.folder}/checkpoints/'
 
		self.SafeLogger.debug(f'Downloading model from HuggingFace or fetching from cache: {modelName}')
 
		myTDTask = self.ThreadManager.TDTask(
			target=self.LoadModel,
			SuccessHook=self.LoadModelSuccess,
			ExceptHook=self.LoadModelExcept,
			RefreshHook=self.LoadModelRefresh,
			args=(modelPath, checkpointsCache)
		)
		self.ThreadManager.EnqueueTask(myTDTask)
		self.SafeLogger.debug('Model thread started')

This is our threaded method implementation.

Next is the blocking LoadModel.

In LoadModel, for the sake of demonstrating the Thread Manager infoQueue, we will put in the queue a few infoDict to update our Thread Monitors UI. We are also setting the Progress attribute using a safe method which use a lock internally in the threadManager with SetProgressSafe.

Other than those, the main methods to use are the following:

# First the image processor, which is used on images before being passed to the model
# This is for the model to understand how to interpret the image
AutoImageProcessor.from_pretrained(modelPath, cache_dir=checkpointsCache)
# Then the model itself, which is used to predict the depth of the image
AutoModelForDepthEstimation.from_pretrained(modelPath)
# We are now uploading the model to the device our extension initialized with.
# CUDA if on Windows with an Nvidia GPU, MPS if on MacOS, or CPU.
model.to(device=self.Device)
# We are in a thread but we want to use our image processor and model 
# later from other threads
self.PushToImageProc(image_processor)
self.PushToModel(model)

This is our full LoadModel implementation:

	def LoadModel(self, modelPath:str, checkpointsCache:str=None):
		currentTDThread = threading.current_thread()
 
		currentTDThread.SetProgressSafe(.0)
		infoDict = {
				'id': str(currentTDThread.ident),
				'name': currentTDThread.name,
				'messageType': 'TD_TM_TaskProgressUpdate',
				'progress': float(currentTDThread.Progress),
				'state': 'Processing'
		}
		currentTDThread.InfoQueue.put(infoDict)
 
		image_processor = AutoImageProcessor.from_pretrained(modelPath, cache_dir=checkpointsCache)
 
		currentTDThread.SetProgressSafe(.15)
		infoDict = {
				'id': str(currentTDThread.ident),
				'name': currentTDThread.name,
				'messageType': 'TD_TM_TaskProgressUpdate',
				'progress': float(currentTDThread.Progress),
				'state': 'Processing'
		}
		currentTDThread.InfoQueue.put(infoDict)
 
		model = AutoModelForDepthEstimation.from_pretrained(modelPath, cache_dir=checkpointsCache)
 
		currentTDThread.SetProgressSafe(.2)
		infoDict = {
				'id': str(currentTDThread.ident),
				'name': currentTDThread.name,
				'messageType': 'TD_TM_TaskProgressUpdate',
				'progress': float(currentTDThread.Progress),
				'state': 'Processing'
		}
		currentTDThread.InfoQueue.put(infoDict)
 
		model.to(device=self.Device)
 
		currentTDThread.SetProgressSafe(.6)
		infoDict = {
				'id': str(currentTDThread.ident),
				'name': currentTDThread.name,
				'messageType': 'TD_TM_TaskProgressUpdate',
				'progress': float(currentTDThread.Progress),
				'state': 'Processing'
		}
		currentTDThread.InfoQueue.put(infoDict)
 
		self.PushToImageProc(image_processor)
 
		currentTDThread.SetProgressSafe(.9)
		infoDict = {
				'id': str(currentTDThread.ident),
				'name': currentTDThread.name,
				'messageType': 'TD_TM_TaskProgressUpdate',
				'progress': float(currentTDThread.Progress),
				'state': 'Processing'
		}
		currentTDThread.InfoQueue.put(infoDict)
 
		self.PushToModel(model)
 
		currentTDThread.SetProgressSafe(1.0)
		infoDict = {
				'id': str(currentTDThread.ident),
				'name': currentTDThread.name,
				'messageType': 'TD_TM_TaskProgressUpdate',
				'progress': float(currentTDThread.Progress),
				'state': 'Processing'
		}
		currentTDThread.InfoQueue.put(infoDict)

We now need to implement PushToImageProc and PushToModel. While we are at it we will also implement our GetImageProc and GetModel.

	def PushToModel(self, model):
		with self.DepthAnythingLock:
			self.Model = model
 
	def GetModel(self):
		with self.DepthAnythingLock:
			return self.Model
 
	def PushToImageProc(self, imageProc):
		with self.DepthAnythingLock:
			self.ImageProc = imageProc
 
	def GetImageProc(self):
		with self.DepthAnythingLock:
			return self.ImageProc

Last, our callbacks:

	def LoadModelSuccess(self):
		model = self.GetModel() 
		self.Logger.Info(f'The model {model} is loaded and ready for use.')
 
	def LoadModelExcept(self, *args):
		self.SafeLogger.error(f'An error occured while trying to load the model, see thread exception for details. {args}')	
 
	def LoadModelRefresh(self):
		self.Logger.Debug(f'Model is loading, please wait.')
		return

If all is setup correctly, you can now click the trigger and the model will be downloaded from Hugging Face servers, added to your checkpoints folder cache, and uploaded to your GPU. While this is happening, you’ll see messages printed in the textport and the threadsMonitor COMP will show the progress.

In the textport, you can type op('/project1/TDDepthAnything').GetModel() and you should see the model's info (DepthAnythingForDepthEstimation) printed in the textport.

Next, we write the inference methods.

When clicking on the Trigger Inference pulse parameter, we want to call a DepthInferenceThreaded method:

	def OnPulseTriggerinference(self, par):
		self.DepthInferenceThreaded()
		self.Logger.Debug('Running inference')

We will add a check in DepthInferenceThreaded to see if a new inference task can actually be created. Maybe our machine is already running a task and we are waiting for a result.

	def DepthInferenceThreaded(self):
		if self.IsReady:
			self.IsReady = False
			image = self.InputImage.numpyArray(delayed=False, writable=False)
			myTDTask = self.ThreadManager.TDTask(
				target=self.DepthInference,
				SuccessHook=self.DepthInferenceSuccess,
				ExceptHook=self.DepthInferenceExcept,
				RefreshHook=self.DepthInferenceRefresh,
				args=(image,)
			)			
			self.ThreadManager.EnqueueTask(myTDTask, self.Device)
 
		else:
			if self.Model:
				self.Logger.Debug('Inference is already running, please wait.')
				return
 
			self.Logger.Debug('The model is not loaded, please load the model first.')
			return

In the above code snippet, very similar to our previous threaded method, we are first getting our image as a NumPy array before creating our inference task.

As simple checks, we assume that if we are not ready to run inference but the model is loaded, we are already running inference, otherwise it means our model is not loaded.

Now let’s implement the DepthInference method, our blocking method.

First, we want to get our image processor and our model safely. We’ll use:

		imageProc = self.GetImageProc()
		model = self.GetModel()

And we need to make sure that our NumPy array is ready to be used by our image processor and the model.

We’ll write a small method to do this:

	def PreprocessTDNpArray(self, image):
		image = image[:, :, :3]  # Remove alpha channel
		image = image[:, :, ::-1]  # Convert BGR to RGB
		image = image.astype(np.float32) * 255
		return image

which we will use after checking that we have indeed an imageProc and model to work with:

	def DepthInference(self, image, device):
		self.SafeLogger.debug('Starting Inference...')
 
		imageProc = self.GetImageProc()
		model = self.GetModel()
 
		if imageProc and model:
			# Default npArray shape from script TOP is (h, w, 4), we are getting rid of the alpha channel.
			image = self.PreprocessTDNpArray(image)

Next, we want to get the resolution of our image. We can get to it using the first 2 values of our NumPy Array shape: image.shape[:2]

We will store this in a res variable.

We’ll use the image processor to be used with the model on our NumPy Array, and make sure the data is on the GPU.

			inputs = imageProc(images=image, return_tensors="pt")
			inputs = {k: v.to(device) for k, v in inputs.items()}

We can now proceed and run the model:

			with torch.no_grad():
				outputs = model(**inputs)

Where with torch.no_grad() is an optimization to avoid using gradients, which are not something we need to worry about when running inference in this case.

We’ll get the model output, and pass this to the HuggingFace post processor for depth estimation.

			prediction = imageProc.post_process_depth_estimation(
				outputs,
				target_sizes=[res],
			)
 
			prediction = prediction[0]["predicted_depth"]

And prepare the final depth data to be dumped to a Script TOP with a method specific to that:

	def PostprocessPrediction(self, prediction, res):
		prediction = (prediction - prediction.min()) / (prediction.max() - prediction.min())  # Normalize to [0, 1]
		prediction *= 65535  # Scale to 16-bit
		prediction = prediction.cpu().numpy().astype(np.uint16)
 
		# Create a buffer with 4 channels
		buffer = np.zeros((res[0], res[1], 4), dtype=np.uint16)
		buffer[:, :, 0] = prediction  # Assign depth to the first channel
		return buffer

We’ll use it in our DepthInference code, and finish by setting our NpDepth value.

			buffer = self.PostprocessPrediction(prediction, res)
			self.SetNpDepth(buffer)
 
			self.SafeLogger.debug(f'Finished Inference... NpDepth = {self.NpDepth}')

Here is the full code for inference:

def DepthInference(self, image, device):
		self.SafeLogger.debug('Starting Inference...')
 
		imageProc = self.GetImageProc()
		model = self.GetModel()
 
		if imageProc and model:
			# Default npArray shape from script TOP is (h, w, 4), we are getting rid of the alpha channel.
			image = self.PreprocessTDNpArray(image)
			res = image.shape[:2]
 
			inputs = imageProc(images=image, return_tensors="pt")
			inputs = {k: v.to(device) for k, v in inputs.items()}
 
			with torch.no_grad():
				outputs = model(**inputs)
 
			# interpolate to original size
			# We are downscaling so that it doesn't fill memory
			prediction = imageProc.post_process_depth_estimation(
				outputs,
				target_sizes=[res],
			)
 
			prediction = prediction[0]["predicted_depth"]
			buffer = self.PostprocessPrediction(prediction, res)
			self.SetNpDepth(buffer)
 
			self.SafeLogger.debug(f'Finished Inference... NpDepth = {self.NpDepth}')
 
		else:
			debug('The Image Pre-Processor and/or the model is/are not initialized, aborting.')

Our callbacks are again fairly straight forward:

	def DepthInferenceSuccess(self):
		self.IsReady = True
		self.ScriptBuffer.copyNumpyArray(self.GetNpDepth())
 
	def DepthInferenceExcept(self, *args):
		self.SafeLogger.error(f'The inference failed. See exception details. {args}')
 
	def DepthInferenceRefresh(self):
		self.Logger.Info('The inference is running, please wait.')

The main thing to note is that it’s on success that we push our new NpDepth to the Script TOP.

So, other than that we have some very simple methods which speak for themselves:

	def OnPulseUnloadmodel(self, par):
		self.UnloadModelThreaded()
		self.Logger.Debug('Unloading Model')
 
	def UnloadModel(self):
		try:
			model = self.GetModel()
			imageProc = self.GetImageProc()
			if model:
				self.PushToModel(None)
 
			if imageProc:
				self.PushToImageProc(None)
 
			gc.collect()
			torch.cuda.empty_cache()
 
		except:
			self.SafeLogger.debug('An error occurred while unloading the model.')
 
	def UnloadModelThreaded(self):
		myTDTask = self.ThreadManager.TDTask(target=self.UnloadModel)
		self.ThreadManager.EnqueueTask(myTDTask)
		self.SafeLogger.debug(myTDTask)

These are to unload our model. The difference here is that our threaded method is only using a target and doesn’t setup the callbacks.

And a reset, in case it’s needed:

	def OnPulseReset(self, par):
		self.UnloadModelThreaded()
		self.SetNpDepth(np.random.randint(0, high=255, size=(2, 2, 4), dtype='uint16'))
		self.ScriptBuffer.copyNumpyArray(self.GetNpDepth())
		self.IsReady = False		
		return

The full file is available on Github: https://github.com/TouchDesigner/TDDepthAnything

Last, we’ll add a little feature that runs inference every time our input is cooking.

Add an OP Execute DAT below inputImage.

Bind the Active par of the OP Execute to the parent parent.TDDepthAnything.par.Runninginference

Toggle the Post Cook toggle and add this line in the onPostcook callback: parent().DepthInferenceThreaded()

Now, when you toggle the parameter on the COMP Run Inference and your input is cooking, it will automatically Run Inference

! Note: This is not running in real time, and it’s heavily threaded. There is no queue designed, or buffer and it’s not the fastest way to run this model. It means that not every single frame from the input will be processed.

Conclusion

This is an example of the many usages that are now enabled and easily brought into TouchDesigner thanks to the Thread Manager and the Python Environment Manager.

We hope that your testing goes well and that it brings many ideas for you to experiment with.

As always, feel free to share your creations in the forum and your components in the community section! If you have any issue with this tutorial, please leave a comment below.

TensorRT fork from Oleg Chomp w/ Thread Manager and TD Py Env Manager setup.

For users with modern Nvidia GPUs, I forked Oleg Chomp’s Depth Anything TensorRT component to rely on the Thread Manager and TDPyEnvManager, as well as HuggingFace Transformers, among other changes. The installation can now be done in just a couple click all from within TouchDesigner.

More details at: https://github.com/jetXS/TDDepthAnythingRT

Comments