HomeArtificial IntelligenceImplementing DeepSpeed for Scalable Transformers: Superior Coaching with Gradient Checkpointing and Parallelism

Implementing DeepSpeed for Scalable Transformers: Superior Coaching with Gradient Checkpointing and Parallelism


On this superior DeepSpeed tutorial, we offer a hands-on walkthrough of cutting-edge optimization strategies for coaching giant language fashions effectively. By combining ZeRO optimization, mixed-precision coaching, gradient accumulation, and superior DeepSpeed configurations, the tutorial demonstrates the best way to maximize GPU reminiscence utilization, cut back coaching overhead, and allow scaling of transformer fashions in resource-constrained environments, corresponding to Colab. Alongside mannequin creation and coaching, it additionally covers efficiency monitoring, inference optimization, checkpointing, and benchmarking completely different ZeRO levels, offering practitioners with each theoretical insights and sensible code to speed up mannequin improvement. Take a look at the FULL CODES right here.

import subprocess
import sys
import os
import json
import time
from pathlib import Path


def install_dependencies():
   """Set up required packages for DeepSpeed in Colab"""
   print("🚀 Putting in DeepSpeed and dependencies...")
  
   subprocess.check_call([
       sys.executable, "-m", "pip", "install",
       "torch", "torchvision", "torchaudio", "--index-url",
       "https://download.pytorch.org/whl/cu118"
   ])
  
   subprocess.check_call([sys.executable, "-m", "pip", "install", "deepspeed"])
  
   subprocess.check_call([
       sys.executable, "-m", "pip", "install",
       "transformers", "datasets", "accelerate", "wandb"
   ])
  
   print("✅ Set up full!")


install_dependencies()


import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.information import DataLoader, Dataset
import deepspeed
from transformers import GPT2Config, GPT2LMHeadModel, GPT2Tokenizer
import numpy as np
from typing import Dict, Any
import argparse

We arrange our Colab setting by putting in PyTorch with CUDA help, DeepSpeed, and important libraries like Transformers, Datasets, Speed up, and Weights & Biases. We guarantee every little thing is prepared so we are able to easily construct and prepare fashions with DeepSpeed. Take a look at the FULL CODES right here.

class SyntheticTextDataset(Dataset):
   """Artificial dataset for demonstration functions"""
  
   def __init__(self, dimension: int = 1000, seq_length: int = 512, vocab_size: int = 50257):
       self.dimension = dimension
       self.seq_length = seq_length
       self.vocab_size = vocab_size
      
       self.information = torch.randint(0, vocab_size, (dimension, seq_length))
      
   def __len__(self):
       return self.dimension
  
   def __getitem__(self, idx):
       return {
           'input_ids': self.information[idx],
           'labels': self.information[idx].clone() 
       }

We create a SyntheticTextDataset the place we generate random token sequences to imitate actual textual content information. We use these sequences as each inputs and labels, permitting us to shortly check DeepSpeed coaching with out counting on a big exterior dataset. Take a look at the FULL CODES right here.

class AdvancedDeepSpeedTrainer:
   """Superior DeepSpeed coach with a number of optimization strategies"""
  
   def __init__(self, model_config: Dict[str, Any], ds_config: Dict[str, Any]):
       self.model_config = model_config
       self.ds_config = ds_config
       self.mannequin = None
       self.engine = None
       self.tokenizer = None
      
   def create_model(self):
       """Create a GPT-2 fashion mannequin for demonstration"""
       print("🧠 Creating mannequin...")
      
       config = GPT2Config(
           vocab_size=self.model_config['vocab_size'],
           n_positions=self.model_config['seq_length'],
           n_embd=self.model_config['hidden_size'],
           n_layer=self.model_config['num_layers'],
           n_head=self.model_config['num_heads'],
           resid_pdrop=0.1,
           embd_pdrop=0.1,
           attn_pdrop=0.1,
       )
      
       self.mannequin = GPT2LMHeadModel(config)
       self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
      
       self.tokenizer.pad_token = self.tokenizer.eos_token
      
       print(f"📊 Mannequin parameters: {sum(p.numel() for p in self.mannequin.parameters()):,}")
       return self.mannequin
  
   def create_deepspeed_config(self):
       """Create complete DeepSpeed configuration"""
       return {
           "train_batch_size": self.ds_config['train_batch_size'],
           "train_micro_batch_size_per_gpu": self.ds_config['micro_batch_size'],
           "gradient_accumulation_steps": self.ds_config['gradient_accumulation_steps'],
          
           "zero_optimization": {
               "stage": self.ds_config['zero_stage'],
               "allgather_partitions": True,
               "allgather_bucket_size": 5e8,
               "overlap_comm": True,
               "reduce_scatter": True,
               "reduce_bucket_size": 5e8,
               "contiguous_gradients": True,
               "cpu_offload": self.ds_config.get('cpu_offload', False)
           },
          
           "fp16": {
               "enabled": True,
               "loss_scale": 0,
               "loss_scale_window": 1000,
               "initial_scale_power": 16,
               "hysteresis": 2,
               "min_loss_scale": 1
           },
          
           "optimizer": {
               "sort": "AdamW",
               "params": {
                   "lr": self.ds_config['learning_rate'],
                   "betas": [0.9, 0.999],
                   "eps": 1e-8,
                   "weight_decay": 0.01
               }
           },
          
           "scheduler": {
               "sort": "WarmupLR",
               "params": {
                   "warmup_min_lr": 0,
                   "warmup_max_lr": self.ds_config['learning_rate'],
                   "warmup_num_steps": 100
               }
           },
          
           "gradient_clipping": 1.0,
          
           "wall_clock_breakdown": True,
          
           "memory_breakdown": True,
          
           "tensorboard": {
               "enabled": True,
               "output_path": "./logs/",
               "job_name": "deepspeed_advanced_tutorial"
           }
       }
  
   def initialize_deepspeed(self):
       """Initialize DeepSpeed engine"""
       print("⚡ Initializing DeepSpeed...")
      
       parser = argparse.ArgumentParser()
       parser.add_argument('--local_rank', sort=int, default=0)
       args = parser.parse_args([])
      
       self.engine, optimizer, _, lr_scheduler = deepspeed.initialize(
           args=args,
           mannequin=self.mannequin,
           config=self.create_deepspeed_config()
       )
      
       print(f"🎯 DeepSpeed engine initialized with ZeRO stage {self.ds_config['zero_stage']}")
       return self.engine
  
   def train_step(self, batch: Dict[str, torch.Tensor]) -> Dict[str, float]:
       """Carry out a single coaching step with DeepSpeed optimizations"""
      
       input_ids = batch['input_ids'].to(self.engine.system)
       labels = batch['labels'].to(self.engine.system)
      
       outputs = self.engine(input_ids=input_ids, labels=labels)
       loss = outputs.loss
      
       self.engine.backward(loss)
      
       self.engine.step()
      
       return {
           'loss': loss.merchandise(),
           'lr': self.engine.lr_scheduler.get_last_lr()[0] if self.engine.lr_scheduler else 0
       }
  
   def prepare(self, dataloader: DataLoader, num_epochs: int = 2):
       """Full coaching loop with monitoring"""
       print(f"🏋️ Beginning coaching for {num_epochs} epochs...")
      
       self.engine.prepare()
       total_steps = 0
      
       for epoch in vary(num_epochs):
           epoch_loss = 0.0
           epoch_steps = 0
          
           print(f"n📈 Epoch {epoch + 1}/{num_epochs}")
          
           for step, batch in enumerate(dataloader):
               start_time = time.time()
              
               metrics = self.train_step(batch)
              
               epoch_loss += metrics['loss']
               epoch_steps += 1
               total_steps += 1
              
               if step % 10 == 0:
                   step_time = time.time() - start_time
                   print(f"  Step {step:4d} | Loss: {metrics['loss']:.4f} | "
                         f"LR: {metrics['lr']:.2e} | Time: {step_time:.3f}s")
              
               if step % 20 == 0 and hasattr(self.engine, 'monitor'):
                   self.log_memory_stats()
              
               if step >= 50: 
                   break
          
           avg_loss = epoch_loss / epoch_steps
           print(f"📊 Epoch {epoch + 1} accomplished | Common Loss: {avg_loss:.4f}")
      
       print("🎉 Coaching accomplished!")
  
   def log_memory_stats(self):
       """Log GPU reminiscence statistics"""
       if torch.cuda.is_available():
           allotted = torch.cuda.memory_allocated() / 1024**3 
           reserved = torch.cuda.memory_reserved() / 1024**3  
           print(f"  💾 GPU Reminiscence - Allotted: {allotted:.2f}GB | Reserved: {reserved:.2f}GB")
  
   def save_checkpoint(self, path: str):
       """Save mannequin checkpoint utilizing DeepSpeed"""
       print(f"💾 Saving checkpoint to {path}")
       self.engine.save_checkpoint(path)
  
   def demonstrate_inference(self, textual content: str = "The way forward for AI is"):
       """Display optimized inference with DeepSpeed"""
       print(f"n🔮 Operating inference with immediate: '{textual content}'")
      
       inputs = self.tokenizer.encode(textual content, return_tensors="pt").to(self.engine.system)
      
       self.engine.eval()
      
       with torch.no_grad():
           outputs = self.engine.module.generate(
               inputs,
               max_length=inputs.form[1] + 50,
               num_return_sequences=1,
               temperature=0.8,
               do_sample=True,
               pad_token_id=self.tokenizer.eos_token_id
           )
      
       generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
       print(f"📝 Generated textual content: {generated_text}")
      
       self.engine.prepare()

We construct an end-to-end coach that creates a GPT-2 mannequin, units a DeepSpeed config (ZeRO, FP16, AdamW, warmup scheduler, tensorboard), and initializes the engine. We then run environment friendly coaching steps with logging and reminiscence statistics, save checkpoints, and reveal inference to confirm optimization and technology in a single place. Take a look at the FULL CODES right here.

def run_advanced_tutorial():
   """Primary operate to run the superior DeepSpeed tutorial"""
  
   print("🌟 Superior DeepSpeed Tutorial Beginning...")
   print("=" * 60)
  
   model_config = {
       'vocab_size': 50257,
       'seq_length': 512,
       'hidden_size': 768, 
       'num_layers': 6,    
       'num_heads': 12
   }
  
   ds_config = {
       'train_batch_size': 16,
       'micro_batch_size': 4,
       'gradient_accumulation_steps': 4,
       'zero_stage': 2, 
       'learning_rate': 1e-4,
       'cpu_offload': False 
   }
  
   print("📋 Configuration:")
   print(f"  Mannequin dimension: ~{sum(np.prod(form) for form in [[model_config['vocab_size'], model_config['hidden_size']], [model_config['hidden_size'], model_config['hidden_size']] * model_config['num_layers']]) / 1e6:.1f}M parameters")
   print(f"  ZeRO Stage: {ds_config['zero_stage']}")
   print(f"  Batch dimension: {ds_config['train_batch_size']}")
  
   coach = AdvancedDeepSpeedTrainer(model_config, ds_config)
  
   mannequin = coach.create_model()
  
   engine = coach.initialize_deepspeed()
  
   print("n📚 Creating artificial dataset...")
   dataset = SyntheticTextDataset(
       dimension=200,
       seq_length=model_config['seq_length'],
       vocab_size=model_config['vocab_size']
   )
  
   dataloader = DataLoader(
       dataset,
       batch_size=ds_config['micro_batch_size'],
       shuffle=True
   )
  
   print("n📊 Pre-training reminiscence stats:")
   coach.log_memory_stats()
  
   coach.prepare(dataloader, num_epochs=2)
  
   print("n📊 Put up-training reminiscence stats:")
   coach.log_memory_stats()
  
   coach.demonstrate_inference("DeepSpeed allows environment friendly coaching of")
  
   checkpoint_path = "./deepspeed_checkpoint"
   coach.save_checkpoint(checkpoint_path)
  
   demonstrate_zero_stages()
   demonstrate_memory_optimization()
  
   print("n🎯 Tutorial accomplished efficiently!")
   print("Key DeepSpeed options demonstrated:")
   print("  ✅ ZeRO optimization for reminiscence effectivity")
   print("  ✅ Blended precision coaching (FP16)")
   print("  ✅ Gradient accumulation")
   print("  ✅ Studying charge scheduling")
   print("  ✅ Checkpoint saving/loading")
   print("  ✅ Reminiscence monitoring")


def demonstrate_zero_stages():
   """Display completely different ZeRO optimization levels"""
   print("n🔧 ZeRO Optimization Phases Defined:")
   print("  Stage 0: Disabled (baseline)")
   print("  Stage 1: Optimizer state partitioning (~4x reminiscence discount)")
   print("  Stage 2: Gradient partitioning (~8x reminiscence discount)")
   print("  Stage 3: Parameter partitioning (~Nx reminiscence discount)")
  
   zero_configs = {
       1: {"stage": 1, "reduce_bucket_size": 5e8},
       2: {"stage": 2, "allgather_partitions": True, "reduce_scatter": True},
       3: {"stage": 3, "stage3_prefetch_bucket_size": 5e8, "stage3_param_persistence_threshold": 1e6}
   }
  
   for stage, config in zero_configs.objects():
       estimated_memory_reduction = [1, 4, 8, "Nx"][stage]
       print(f"  📉 Stage {stage}: ~{estimated_memory_reduction}x reminiscence discount")


def demonstrate_memory_optimization():
   """Present reminiscence optimization strategies"""
   print("n🧠 Reminiscence Optimization Methods:")
   print("  🔄 Gradient Checkpointing: Commerce compute for reminiscence")
   print("  📤 CPU Offloading: Transfer optimizer states to CPU")
   print("  🗜️ Compression: Cut back communication overhead")
   print("  ⚡ Blended Precision: Use FP16 for quicker coaching")

We orchestrate the complete coaching run: set configs, construct the GPT-2 mannequin and DeepSpeed engine, create an artificial dataset, monitor GPU reminiscence, prepare for 2 epochs, run inference, and save a checkpoint. We then clarify ZeRO levels and spotlight memory-optimization ways, corresponding to gradient checkpointing and CPU offloading, to know the trade-offs in apply. Take a look at the FULL CODES right here.

class DeepSpeedConfigGenerator:
   """Utility class to generate DeepSpeed configurations"""
  
   @staticmethod
   def generate_config(
       batch_size: int = 16,
       zero_stage: int = 2,
       use_cpu_offload: bool = False,
       learning_rate: float = 1e-4
   ) -> Dict[str, Any]:
       """Generate an entire DeepSpeed configuration"""
      
       config = {
           "train_batch_size": batch_size,
           "train_micro_batch_size_per_gpu": max(1, batch_size // 4),
           "gradient_accumulation_steps": max(1, batch_size // max(1, batch_size // 4)),
          
           "zero_optimization": {
               "stage": zero_stage,
               "allgather_partitions": True,
               "allgather_bucket_size": 5e8,
               "overlap_comm": True,
               "reduce_scatter": True,
               "reduce_bucket_size": 5e8,
               "contiguous_gradients": True
           },
          
           "fp16": {
               "enabled": True,
               "loss_scale": 0,
               "loss_scale_window": 1000,
               "initial_scale_power": 16,
               "hysteresis": 2,
               "min_loss_scale": 1
           },
          
           "optimizer": {
               "sort": "AdamW",
               "params": {
                   "lr": learning_rate,
                   "betas": [0.9, 0.999],
                   "eps": 1e-8,
                   "weight_decay": 0.01
               }
           },
          
           "scheduler": {
               "sort": "WarmupLR",
               "params": {
                   "warmup_min_lr": 0,
                   "warmup_max_lr": learning_rate,
                   "warmup_num_steps": 100
               }
           },
          
           "gradient_clipping": 1.0,
           "wall_clock_breakdown": True
       }
      
       if use_cpu_offload:
           config["zero_optimization"]["cpu_offload"] = True
           config["zero_optimization"]["pin_memory"] = True
      
       if zero_stage == 3:
           config["zero_optimization"].replace({
               "stage3_prefetch_bucket_size": 5e8,
               "stage3_param_persistence_threshold": 1e6,
               "stage3_gather_16bit_weights_on_model_save": True
           })
      
       return config


def benchmark_zero_stages():
   """Benchmark completely different ZeRO levels"""
   print("n🏁 Benchmarking ZeRO Phases...")
  
   model_config = {
       'vocab_size': 50257,
       'seq_length': 256,
       'hidden_size': 512,
       'num_layers': 4,
       'num_heads': 8
   }
  
   outcomes = {}
  
   for stage in [1, 2]:  
       print(f"n🔬 Testing ZeRO Stage {stage}...")
      
       ds_config = {
           'train_batch_size': 8,
           'micro_batch_size': 2,
           'gradient_accumulation_steps': 4,
           'zero_stage': stage,
           'learning_rate': 1e-4
       }
      
       attempt:
           coach = AdvancedDeepSpeedTrainer(model_config, ds_config)
           mannequin = coach.create_model()
           engine = coach.initialize_deepspeed()
          
           if torch.cuda.is_available():
               torch.cuda.reset_peak_memory_stats()
              
               dataset = SyntheticTextDataset(dimension=20, seq_length=model_config['seq_length'])
               dataloader = DataLoader(dataset, batch_size=ds_config['micro_batch_size'])
              
               start_time = time.time()
               for i, batch in enumerate(dataloader):
                   if i >= 5: 
                       break
                   coach.train_step(batch)
              
               end_time = time.time()
               peak_memory = torch.cuda.max_memory_allocated() / 1024**3 
              
               outcomes[stage] = {
                   'peak_memory_gb': peak_memory,
                   'time_per_step': (end_time - start_time) / 5
               }
              
               print(f"  📊 Peak Reminiscence: {peak_memory:.2f}GB")
               print(f"  ⏱️ Time per step: {outcomes[stage]['time_per_step']:.3f}s")
          
           del coach, mannequin, engine
           torch.cuda.empty_cache()
          
       besides Exception as e:
           print(f"  ❌ Error with stage {stage}: {str(e)}")
  
   if len(outcomes) > 1:
       print(f"n📈 Comparability:")
       stage_1_memory = outcomes.get(1, {}).get('peak_memory_gb', 0)
       stage_2_memory = outcomes.get(2, {}).get('peak_memory_gb', 0)
      
       if stage_1_memory > 0 and stage_2_memory > 0:
           memory_reduction = (stage_1_memory - stage_2_memory) / stage_1_memory * 100
           print(f"  🎯 Reminiscence discount from Stage 1 to 2: {memory_reduction:.1f}%")


def demonstrate_advanced_features():
   """Display extra superior DeepSpeed options"""
   print("n🚀 Superior DeepSpeed Options:")
  
   print("  🎚️ Dynamic Loss Scaling: Mechanically adjusts FP16 loss scaling")
  
   print("  🗜️ Gradient Compression: Reduces communication overhead")
  
   print("  🔄 Pipeline Parallelism: Splits mannequin throughout gadgets")
  
   print("  🧑‍🎓 Skilled Parallelism: Environment friendly Combination-of-Consultants coaching")
  
   print("  📚 Curriculum Studying: Progressive coaching methods")


if __name__ == "__main__":
   print(f"🖥️ CUDA Out there: {torch.cuda.is_available()}")
   if torch.cuda.is_available():
       print(f"   GPU: {torch.cuda.get_device_name()}")
       print(f"   Reminiscence: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f}GB")
  
   attempt:
       run_advanced_tutorial()
      
       benchmark_zero_stages()
      
       demonstrate_advanced_features()
      
   besides Exception as e:
       print(f"❌ Error throughout tutorial: {str(e)}")
       print("💡 Ideas for troubleshooting:")
       print("  - Guarantee you will have GPU runtime enabled in Colab")
       print("  - Attempt decreasing batch_size or mannequin dimension if going through reminiscence points")
       print("  - Allow CPU offloading in ds_config if wanted")

We generate reusable DeepSpeed configurations, benchmark ZeRO levels to match reminiscence and pace, and showcase superior options corresponding to dynamic loss scaling and pipeline/MoE parallelism. We additionally detect CUDA, run the complete tutorial end-to-end, and supply clear troubleshooting ideas, permitting us to iterate confidently in Colab.

In conclusion, we achieve a complete understanding of how DeepSpeed enhances mannequin coaching effectivity by putting a stability between efficiency and reminiscence trade-offs. From leveraging ZeRO levels for reminiscence discount to making use of FP16 blended precision and CPU offloading, the tutorial showcases highly effective methods that make large-scale coaching accessible on modest {hardware}. By the tip, learners may have skilled and optimized a GPT-style mannequin, benchmarked configurations, monitored GPU assets, and explored superior options corresponding to pipeline parallelism and gradient compression.


Take a look at the FULL CODES right here. Be happy to take a look at our GitHub Web page for Tutorials, Codes and Notebooks. Additionally, be happy to comply with us on Twitter and don’t neglect to affix our 100k+ ML SubReddit and Subscribe to our Publication.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its reputation amongst audiences.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments