Flux Krea Batch Processing and Automation Guide

Scaling AI image generation requires efficient batch processing and automation strategies. This comprehensive guide covers techniques for processing multiple images, automating workflows, and building production-ready systems with Flux Krea.

Understanding Batch Processing Benefits

Batch processing transforms AI image generation from individual tasks to scalable operations:

  • Efficiency Gains: Process hundreds of images without manual intervention
  • Resource Optimization: Better GPU utilization through parallel processing
  • Cost Reduction: Lower per-image generation costs
  • Quality Consistency: Uniform settings across large image sets
  • Time Savings: Hands-off processing of large jobs

Basic Batch Processing Implementation

Simple Batch Script

import torch
from diffusers import FluxPipeline
import json
import os
from datetime import datetime

class FluxBatchProcessor:
    def __init__(self, model_path="black-forest-labs/FLUX.1-krea-dev"):
        self.pipe = FluxPipeline.from_pretrained(
            model_path, 
            torch_dtype=torch.float16
        )
        self.pipe.to("cuda")
        
    def process_batch(self, prompts, output_dir="output", **kwargs):
        os.makedirs(output_dir, exist_ok=True)
        results = []
        
        for i, prompt in enumerate(prompts):
            print(f"Processing {i+1}/{len(prompts)}: {prompt[:50]}...")
            
            try:
                image = self.pipe(
                    prompt,
                    num_inference_steps=kwargs.get('steps', 4),
                    guidance_scale=kwargs.get('guidance', 7.5),
                    width=kwargs.get('width', 1024),
                    height=kwargs.get('height', 1024)
                ).images[0]
                
                # Save image
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                filename = f"{timestamp}_{i:03d}.jpg"
                filepath = os.path.join(output_dir, filename)
                image.save(filepath, quality=95)
                
                results.append({
                    "prompt": prompt,
                    "filename": filename,
                    "filepath": filepath,
                    "status": "success"
                })
                
            except Exception as e:
                print(f"Error processing prompt {i+1}: {e}")
                results.append({
                    "prompt": prompt,
                    "status": "error",
                    "error": str(e)
                })
        
        return results

# Usage example
processor = FluxBatchProcessor()
prompts = [
    "Professional business headshot of confident executive",
    "Modern office interior with natural lighting",
    "Product photography of smartphone on white background"
]

results = processor.process_batch(prompts, output_dir="batch_output")

Advanced Batch Processing Strategies

Multi-GPU Batch Processing

Leverage multiple GPUs for maximum throughput:

import torch
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

class MultiGPUBatchProcessor:
    def __init__(self, num_gpus=None):
        self.num_gpus = num_gpus or torch.cuda.device_count()
        
    def process_on_gpu(self, gpu_id, prompts, output_dir, **kwargs):
        torch.cuda.set_device(gpu_id)
        
        pipe = FluxPipeline.from_pretrained(
            "black-forest-labs/FLUX.1-krea-dev",
            torch_dtype=torch.float16
        )
        pipe.to(f"cuda:{gpu_id}")
        
        results = []
        for i, prompt in enumerate(prompts):
            try:
                image = pipe(prompt, **kwargs).images[0]
                filename = f"gpu{gpu_id}_{i:03d}.jpg"
                filepath = os.path.join(output_dir, filename)
                image.save(filepath)
                results.append({"prompt": prompt, "filename": filename})
            except Exception as e:
                results.append({"prompt": prompt, "error": str(e)})
        
        return results
    
    def process_batch_parallel(self, prompts, output_dir="output", **kwargs):
        # Distribute prompts across GPUs
        chunks = [prompts[i::self.num_gpus] for i in range(self.num_gpus)]
        
        with ProcessPoolExecutor(max_workers=self.num_gpus) as executor:
            futures = [
                executor.submit(self.process_on_gpu, i, chunks[i], output_dir, **kwargs)
                for i in range(self.num_gpus) if chunks[i]
            ]
            
            all_results = []
            for future in futures:
                all_results.extend(future.result())
                
        return all_results

CSV and Data-Driven Batch Processing

Processing from Spreadsheets

Enable non-technical users to manage batch jobs through CSV files:

import pandas as pd
import json

class CSVBatchProcessor(FluxBatchProcessor):
    def process_csv(self, csv_path, output_dir="output"):
        df = pd.read_csv(csv_path)
        results = []
        
        for index, row in df.iterrows():
            prompt = row['prompt']
            
            # Extract parameters from CSV columns
            params = {
                'width': int(row.get('width', 1024)),
                'height': int(row.get('height', 1024)),
                'steps': int(row.get('steps', 4)),
                'guidance': float(row.get('guidance', 7.5))
            }
            
            # Optional seed for reproducibility
            if 'seed' in row and pd.notna(row['seed']):
                params['generator'] = torch.Generator().manual_seed(int(row['seed']))
            
            print(f"Processing row {index+1}: {prompt[:50]}...")
            
            try:
                image = self.pipe(prompt, **params).images[0]
                
                # Use custom filename from CSV if provided
                filename = row.get('filename', f"image_{index:03d}.jpg")
                filepath = os.path.join(output_dir, filename)
                image.save(filepath)
                
                results.append({
                    "row": index,
                    "prompt": prompt,
                    "filename": filename,
                    "status": "success",
                    **params
                })
                
            except Exception as e:
                results.append({
                    "row": index,
                    "prompt": prompt,
                    "status": "error",
                    "error": str(e)
                })
        
        # Save results report
        results_df = pd.DataFrame(results)
        results_df.to_csv(os.path.join(output_dir, "batch_results.csv"), index=False)
        
        return results

# Example CSV format:
# prompt,width,height,steps,guidance,seed,filename
# "Professional headshot",1024,1024,4,7.5,12345,"headshot_001.jpg"
# "Office interior",1024,768,8,8.0,67890,"office_001.jpg"

Queue-Based Processing Systems

Job Queue Implementation

Build scalable queue systems for enterprise-level batch processing:

import queue
import threading
import time
import json
from dataclasses import dataclass
from typing import Optional

@dataclass
class GenerationJob:
    id: str
    prompt: str
    width: int = 1024
    height: int = 1024
    steps: int = 4
    guidance: float = 7.5
    seed: Optional[int] = None
    callback_url: Optional[str] = None
    priority: int = 0

class QueuedBatchProcessor:
    def __init__(self, num_workers=2):
        self.job_queue = queue.PriorityQueue()
        self.results = {}
        self.workers = []
        self.num_workers = num_workers
        self.running = False
        
    def add_job(self, job: GenerationJob):
        # Higher priority jobs have lower priority numbers
        self.job_queue.put((job.priority, time.time(), job))
        
    def worker_thread(self, worker_id):
        pipe = FluxPipeline.from_pretrained(
            "black-forest-labs/FLUX.1-krea-dev",
            torch_dtype=torch.float16
        )
        pipe.to("cuda")
        
        while self.running:
            try:
                priority, timestamp, job = self.job_queue.get(timeout=1)
                
                print(f"Worker {worker_id} processing job {job.id}")
                
                # Generate image
                params = {
                    'num_inference_steps': job.steps,
                    'guidance_scale': job.guidance,
                    'width': job.width,
                    'height': job.height
                }
                
                if job.seed:
                    params['generator'] = torch.Generator().manual_seed(job.seed)
                
                image = pipe(job.prompt, **params).images[0]
                
                # Save result
                filename = f"{job.id}.jpg"
                image.save(f"output/{filename}")
                
                self.results[job.id] = {
                    "status": "completed",
                    "filename": filename,
                    "prompt": job.prompt,
                    "worker_id": worker_id
                }
                
                self.job_queue.task_done()
                
            except queue.Empty:
                continue
            except Exception as e:
                self.results[job.id] = {
                    "status": "error",
                    "error": str(e),
                    "worker_id": worker_id
                }
    
    def start_workers(self):
        self.running = True
        for i in range(self.num_workers):
            worker = threading.Thread(target=self.worker_thread, args=(i,))
            worker.start()
            self.workers.append(worker)
    
    def stop_workers(self):
        self.running = False
        for worker in self.workers:
            worker.join()
        self.workers.clear()
    
    def get_job_status(self, job_id):
        return self.results.get(job_id, {"status": "queued"})

Template-Based Automation

Prompt Template System

Automate prompt generation for consistent output across variations:

class PromptTemplateProcessor:
    def __init__(self):
        self.templates = {
            "product_photo": "Professional product photography of {product} on {background}, {lighting} lighting, high resolution, commercial quality",
            "portrait": "Professional {style} portrait of {subject}, {age} years old, {expression}, {lighting}, shot with {lens}",
            "interior": "{style} {room} interior with {lighting}, {color_scheme} color scheme, {furniture}, architectural photography"
        }
        
    def generate_variations(self, template_name, variations):
        template = self.templates.get(template_name)
        if not template:
            raise ValueError(f"Template {template_name} not found")
        
        prompts = []
        for variation in variations:
            prompt = template.format(**variation)
            prompts.append(prompt)
        
        return prompts
    
    def process_template_batch(self, template_name, variations, output_dir):
        prompts = self.generate_variations(template_name, variations)
        
        processor = FluxBatchProcessor()
        results = processor.process_batch(prompts, output_dir)
        
        # Add variation metadata to results
        for i, result in enumerate(results):
            result['variation_data'] = variations[i]
        
        return results

# Example usage
template_processor = PromptTemplateProcessor()

product_variations = [
    {"product": "smartphone", "background": "white background", "lighting": "studio"},
    {"product": "laptop", "background": "wood surface", "lighting": "natural"},
    {"product": "headphones", "background": "black background", "lighting": "dramatic"}
]

results = template_processor.process_template_batch(
    "product_photo", 
    product_variations, 
    "product_output"
)

Quality Control and Filtering

Automated Quality Assessment

Implement quality checks to filter and sort batch results:

import cv2
import numpy as np
from PIL import Image

class QualityFilterProcessor(FluxBatchProcessor):
    def assess_image_quality(self, image_path):
        # Load image
        img = cv2.imread(image_path)
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        
        # Calculate sharpness using Laplacian variance
        sharpness = cv2.Laplacian(gray, cv2.CV_64F).var()
        
        # Calculate brightness
        brightness = np.mean(gray)
        
        # Calculate contrast
        contrast = gray.std()
        
        # Simple quality score
        quality_score = (sharpness * contrast) / (brightness + 1)
        
        return {
            "sharpness": float(sharpness),
            "brightness": float(brightness),
            "contrast": float(contrast),
            "quality_score": float(quality_score)
        }
    
    def process_with_quality_filter(self, prompts, output_dir, min_quality=100):
        # Generate all images first
        results = self.process_batch(prompts, output_dir)
        
        # Assess quality and filter
        filtered_results = []
        for result in results:
            if result.get("status") == "success":
                quality = self.assess_image_quality(result["filepath"])
                result["quality_metrics"] = quality
                
                if quality["quality_score"] >= min_quality:
                    result["quality_passed"] = True
                    filtered_results.append(result)
                else:
                    result["quality_passed"] = False
                    # Optionally delete low-quality images
                    # os.remove(result["filepath"])
        
        return filtered_results

Monitoring and Progress Tracking

Real-time Progress Monitoring

import json
import time
from datetime import datetime

class MonitoredBatchProcessor(FluxBatchProcessor):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.progress_file = "batch_progress.json"
        
    def update_progress(self, current, total, status="processing", extra_data=None):
        progress_data = {
            "timestamp": datetime.now().isoformat(),
            "current": current,
            "total": total,
            "percentage": round((current / total) * 100, 2),
            "status": status,
            "estimated_remaining": self.estimate_remaining_time(current, total),
            "extra_data": extra_data or {}
        }
        
        with open(self.progress_file, 'w') as f:
            json.dump(progress_data, f, indent=2)
    
    def estimate_remaining_time(self, current, total):
        if not hasattr(self, 'start_time') or current == 0:
            return "calculating..."
        
        elapsed = time.time() - self.start_time
        rate = current / elapsed
        remaining_items = total - current
        remaining_seconds = remaining_items / rate if rate > 0 else 0
        
        return f"{remaining_seconds:.0f} seconds"
    
    def process_batch_monitored(self, prompts, output_dir="output", **kwargs):
        self.start_time = time.time()
        total = len(prompts)
        results = []
        
        os.makedirs(output_dir, exist_ok=True)
        
        for i, prompt in enumerate(prompts):
            self.update_progress(i, total, "processing", {
                "current_prompt": prompt[:50] + "..." if len(prompt) > 50 else prompt
            })
            
            try:
                image = self.pipe(prompt, **kwargs).images[0]
                filename = f"image_{i:03d}.jpg"
                filepath = os.path.join(output_dir, filename)
                image.save(filepath)
                
                results.append({
                    "prompt": prompt,
                    "filename": filename,
                    "filepath": filepath,
                    "status": "success"
                })
                
            except Exception as e:
                results.append({
                    "prompt": prompt,
                    "status": "error",
                    "error": str(e)
                })
        
        self.update_progress(total, total, "completed")
        return results

Integration with External Systems

Web API for Batch Processing

from flask import Flask, request, jsonify
import uuid
import threading

app = Flask(__name__)
batch_processor = QueuedBatchProcessor(num_workers=2)
batch_processor.start_workers()

@app.route('/api/batch/submit', methods=['POST'])
def submit_batch():
    data = request.json
    prompts = data.get('prompts', [])
    
    if not prompts:
        return jsonify({"error": "No prompts provided"}), 400
    
    batch_id = str(uuid.uuid4())
    job_ids = []
    
    for i, prompt in enumerate(prompts):
        job_id = f"{batch_id}_{i:03d}"
        job = GenerationJob(
            id=job_id,
            prompt=prompt,
            width=data.get('width', 1024),
            height=data.get('height', 1024),
            steps=data.get('steps', 4),
            guidance=data.get('guidance', 7.5)
        )
        
        batch_processor.add_job(job)
        job_ids.append(job_id)
    
    return jsonify({
        "batch_id": batch_id,
        "job_ids": job_ids,
        "status": "submitted"
    })

@app.route('/api/batch//status', methods=['GET'])
def get_batch_status(batch_id):
    # Get status of all jobs in batch
    job_statuses = {}
    for key, result in batch_processor.results.items():
        if key.startswith(batch_id):
            job_statuses[key] = result
    
    completed = sum(1 for r in job_statuses.values() if r.get("status") == "completed")
    total = len([k for k in batch_processor.results.keys() if k.startswith(batch_id)])
    
    return jsonify({
        "batch_id": batch_id,
        "total_jobs": total,
        "completed_jobs": completed,
        "job_statuses": job_statuses
    })

if __name__ == '__main__':
    app.run(debug=True)

Performance Optimization for Batch Processing

Memory Management

Optimize memory usage for large batch operations:

  • Model Caching: Load model once, reuse across generations
  • Batch Size Tuning: Find optimal batch size for your GPU
  • Memory Cleanup: Clear GPU memory between batches
  • CPU Offloading: Move models to CPU when not in use

Production Deployment Considerations

Scalability and Reliability

  • Load Balancing: Distribute work across multiple servers
  • Error Handling: Robust error recovery and retry mechanisms
  • Monitoring: Comprehensive logging and alerting
  • Resource Management: Automatic scaling based on queue size
  • Data Persistence: Reliable storage for jobs and results

Conclusion

Effective batch processing and automation transforms Flux Krea from a single-image tool into a production-ready system capable of handling enterprise-scale image generation requirements. The techniques covered in this guide provide the foundation for building sophisticated, scalable AI image generation workflows.

Start with simple batch scripts and gradually implement more advanced features like multi-GPU processing, quality filtering, and API integration as your needs grow. Remember to monitor performance and optimize for your specific hardware and use case requirements.