Flux Krea Batch Processing and Automation Guide
Scaling AI image generation requires efficient batch processing and automation strategies. This comprehensive guide covers techniques for processing multiple images, automating workflows, and building production-ready systems with Flux Krea.
Understanding Batch Processing Benefits
Batch processing transforms AI image generation from individual tasks to scalable operations:
- Efficiency Gains: Process hundreds of images without manual intervention
- Resource Optimization: Better GPU utilization through parallel processing
- Cost Reduction: Lower per-image generation costs
- Quality Consistency: Uniform settings across large image sets
- Time Savings: Hands-off processing of large jobs
Basic Batch Processing Implementation
Simple Batch Script
import torch
from diffusers import FluxPipeline
import json
import os
from datetime import datetime
class FluxBatchProcessor:
def __init__(self, model_path="black-forest-labs/FLUX.1-krea-dev"):
self.pipe = FluxPipeline.from_pretrained(
model_path,
torch_dtype=torch.float16
)
self.pipe.to("cuda")
def process_batch(self, prompts, output_dir="output", **kwargs):
os.makedirs(output_dir, exist_ok=True)
results = []
for i, prompt in enumerate(prompts):
print(f"Processing {i+1}/{len(prompts)}: {prompt[:50]}...")
try:
image = self.pipe(
prompt,
num_inference_steps=kwargs.get('steps', 4),
guidance_scale=kwargs.get('guidance', 7.5),
width=kwargs.get('width', 1024),
height=kwargs.get('height', 1024)
).images[0]
# Save image
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{i:03d}.jpg"
filepath = os.path.join(output_dir, filename)
image.save(filepath, quality=95)
results.append({
"prompt": prompt,
"filename": filename,
"filepath": filepath,
"status": "success"
})
except Exception as e:
print(f"Error processing prompt {i+1}: {e}")
results.append({
"prompt": prompt,
"status": "error",
"error": str(e)
})
return results
# Usage example
processor = FluxBatchProcessor()
prompts = [
"Professional business headshot of confident executive",
"Modern office interior with natural lighting",
"Product photography of smartphone on white background"
]
results = processor.process_batch(prompts, output_dir="batch_output")
Advanced Batch Processing Strategies
Multi-GPU Batch Processing
Leverage multiple GPUs for maximum throughput:
import torch
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
class MultiGPUBatchProcessor:
def __init__(self, num_gpus=None):
self.num_gpus = num_gpus or torch.cuda.device_count()
def process_on_gpu(self, gpu_id, prompts, output_dir, **kwargs):
torch.cuda.set_device(gpu_id)
pipe = FluxPipeline.from_pretrained(
"black-forest-labs/FLUX.1-krea-dev",
torch_dtype=torch.float16
)
pipe.to(f"cuda:{gpu_id}")
results = []
for i, prompt in enumerate(prompts):
try:
image = pipe(prompt, **kwargs).images[0]
filename = f"gpu{gpu_id}_{i:03d}.jpg"
filepath = os.path.join(output_dir, filename)
image.save(filepath)
results.append({"prompt": prompt, "filename": filename})
except Exception as e:
results.append({"prompt": prompt, "error": str(e)})
return results
def process_batch_parallel(self, prompts, output_dir="output", **kwargs):
# Distribute prompts across GPUs
chunks = [prompts[i::self.num_gpus] for i in range(self.num_gpus)]
with ProcessPoolExecutor(max_workers=self.num_gpus) as executor:
futures = [
executor.submit(self.process_on_gpu, i, chunks[i], output_dir, **kwargs)
for i in range(self.num_gpus) if chunks[i]
]
all_results = []
for future in futures:
all_results.extend(future.result())
return all_results
CSV and Data-Driven Batch Processing
Processing from Spreadsheets
Enable non-technical users to manage batch jobs through CSV files:
import pandas as pd
import json
class CSVBatchProcessor(FluxBatchProcessor):
def process_csv(self, csv_path, output_dir="output"):
df = pd.read_csv(csv_path)
results = []
for index, row in df.iterrows():
prompt = row['prompt']
# Extract parameters from CSV columns
params = {
'width': int(row.get('width', 1024)),
'height': int(row.get('height', 1024)),
'steps': int(row.get('steps', 4)),
'guidance': float(row.get('guidance', 7.5))
}
# Optional seed for reproducibility
if 'seed' in row and pd.notna(row['seed']):
params['generator'] = torch.Generator().manual_seed(int(row['seed']))
print(f"Processing row {index+1}: {prompt[:50]}...")
try:
image = self.pipe(prompt, **params).images[0]
# Use custom filename from CSV if provided
filename = row.get('filename', f"image_{index:03d}.jpg")
filepath = os.path.join(output_dir, filename)
image.save(filepath)
results.append({
"row": index,
"prompt": prompt,
"filename": filename,
"status": "success",
**params
})
except Exception as e:
results.append({
"row": index,
"prompt": prompt,
"status": "error",
"error": str(e)
})
# Save results report
results_df = pd.DataFrame(results)
results_df.to_csv(os.path.join(output_dir, "batch_results.csv"), index=False)
return results
# Example CSV format:
# prompt,width,height,steps,guidance,seed,filename
# "Professional headshot",1024,1024,4,7.5,12345,"headshot_001.jpg"
# "Office interior",1024,768,8,8.0,67890,"office_001.jpg"
Queue-Based Processing Systems
Job Queue Implementation
Build scalable queue systems for enterprise-level batch processing:
import queue
import threading
import time
import json
from dataclasses import dataclass
from typing import Optional
@dataclass
class GenerationJob:
id: str
prompt: str
width: int = 1024
height: int = 1024
steps: int = 4
guidance: float = 7.5
seed: Optional[int] = None
callback_url: Optional[str] = None
priority: int = 0
class QueuedBatchProcessor:
def __init__(self, num_workers=2):
self.job_queue = queue.PriorityQueue()
self.results = {}
self.workers = []
self.num_workers = num_workers
self.running = False
def add_job(self, job: GenerationJob):
# Higher priority jobs have lower priority numbers
self.job_queue.put((job.priority, time.time(), job))
def worker_thread(self, worker_id):
pipe = FluxPipeline.from_pretrained(
"black-forest-labs/FLUX.1-krea-dev",
torch_dtype=torch.float16
)
pipe.to("cuda")
while self.running:
try:
priority, timestamp, job = self.job_queue.get(timeout=1)
print(f"Worker {worker_id} processing job {job.id}")
# Generate image
params = {
'num_inference_steps': job.steps,
'guidance_scale': job.guidance,
'width': job.width,
'height': job.height
}
if job.seed:
params['generator'] = torch.Generator().manual_seed(job.seed)
image = pipe(job.prompt, **params).images[0]
# Save result
filename = f"{job.id}.jpg"
image.save(f"output/{filename}")
self.results[job.id] = {
"status": "completed",
"filename": filename,
"prompt": job.prompt,
"worker_id": worker_id
}
self.job_queue.task_done()
except queue.Empty:
continue
except Exception as e:
self.results[job.id] = {
"status": "error",
"error": str(e),
"worker_id": worker_id
}
def start_workers(self):
self.running = True
for i in range(self.num_workers):
worker = threading.Thread(target=self.worker_thread, args=(i,))
worker.start()
self.workers.append(worker)
def stop_workers(self):
self.running = False
for worker in self.workers:
worker.join()
self.workers.clear()
def get_job_status(self, job_id):
return self.results.get(job_id, {"status": "queued"})
Template-Based Automation
Prompt Template System
Automate prompt generation for consistent output across variations:
class PromptTemplateProcessor:
def __init__(self):
self.templates = {
"product_photo": "Professional product photography of {product} on {background}, {lighting} lighting, high resolution, commercial quality",
"portrait": "Professional {style} portrait of {subject}, {age} years old, {expression}, {lighting}, shot with {lens}",
"interior": "{style} {room} interior with {lighting}, {color_scheme} color scheme, {furniture}, architectural photography"
}
def generate_variations(self, template_name, variations):
template = self.templates.get(template_name)
if not template:
raise ValueError(f"Template {template_name} not found")
prompts = []
for variation in variations:
prompt = template.format(**variation)
prompts.append(prompt)
return prompts
def process_template_batch(self, template_name, variations, output_dir):
prompts = self.generate_variations(template_name, variations)
processor = FluxBatchProcessor()
results = processor.process_batch(prompts, output_dir)
# Add variation metadata to results
for i, result in enumerate(results):
result['variation_data'] = variations[i]
return results
# Example usage
template_processor = PromptTemplateProcessor()
product_variations = [
{"product": "smartphone", "background": "white background", "lighting": "studio"},
{"product": "laptop", "background": "wood surface", "lighting": "natural"},
{"product": "headphones", "background": "black background", "lighting": "dramatic"}
]
results = template_processor.process_template_batch(
"product_photo",
product_variations,
"product_output"
)
Quality Control and Filtering
Automated Quality Assessment
Implement quality checks to filter and sort batch results:
import cv2
import numpy as np
from PIL import Image
class QualityFilterProcessor(FluxBatchProcessor):
def assess_image_quality(self, image_path):
# Load image
img = cv2.imread(image_path)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Calculate sharpness using Laplacian variance
sharpness = cv2.Laplacian(gray, cv2.CV_64F).var()
# Calculate brightness
brightness = np.mean(gray)
# Calculate contrast
contrast = gray.std()
# Simple quality score
quality_score = (sharpness * contrast) / (brightness + 1)
return {
"sharpness": float(sharpness),
"brightness": float(brightness),
"contrast": float(contrast),
"quality_score": float(quality_score)
}
def process_with_quality_filter(self, prompts, output_dir, min_quality=100):
# Generate all images first
results = self.process_batch(prompts, output_dir)
# Assess quality and filter
filtered_results = []
for result in results:
if result.get("status") == "success":
quality = self.assess_image_quality(result["filepath"])
result["quality_metrics"] = quality
if quality["quality_score"] >= min_quality:
result["quality_passed"] = True
filtered_results.append(result)
else:
result["quality_passed"] = False
# Optionally delete low-quality images
# os.remove(result["filepath"])
return filtered_results
Monitoring and Progress Tracking
Real-time Progress Monitoring
import json
import time
from datetime import datetime
class MonitoredBatchProcessor(FluxBatchProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.progress_file = "batch_progress.json"
def update_progress(self, current, total, status="processing", extra_data=None):
progress_data = {
"timestamp": datetime.now().isoformat(),
"current": current,
"total": total,
"percentage": round((current / total) * 100, 2),
"status": status,
"estimated_remaining": self.estimate_remaining_time(current, total),
"extra_data": extra_data or {}
}
with open(self.progress_file, 'w') as f:
json.dump(progress_data, f, indent=2)
def estimate_remaining_time(self, current, total):
if not hasattr(self, 'start_time') or current == 0:
return "calculating..."
elapsed = time.time() - self.start_time
rate = current / elapsed
remaining_items = total - current
remaining_seconds = remaining_items / rate if rate > 0 else 0
return f"{remaining_seconds:.0f} seconds"
def process_batch_monitored(self, prompts, output_dir="output", **kwargs):
self.start_time = time.time()
total = len(prompts)
results = []
os.makedirs(output_dir, exist_ok=True)
for i, prompt in enumerate(prompts):
self.update_progress(i, total, "processing", {
"current_prompt": prompt[:50] + "..." if len(prompt) > 50 else prompt
})
try:
image = self.pipe(prompt, **kwargs).images[0]
filename = f"image_{i:03d}.jpg"
filepath = os.path.join(output_dir, filename)
image.save(filepath)
results.append({
"prompt": prompt,
"filename": filename,
"filepath": filepath,
"status": "success"
})
except Exception as e:
results.append({
"prompt": prompt,
"status": "error",
"error": str(e)
})
self.update_progress(total, total, "completed")
return results
Integration with External Systems
Web API for Batch Processing
from flask import Flask, request, jsonify
import uuid
import threading
app = Flask(__name__)
batch_processor = QueuedBatchProcessor(num_workers=2)
batch_processor.start_workers()
@app.route('/api/batch/submit', methods=['POST'])
def submit_batch():
data = request.json
prompts = data.get('prompts', [])
if not prompts:
return jsonify({"error": "No prompts provided"}), 400
batch_id = str(uuid.uuid4())
job_ids = []
for i, prompt in enumerate(prompts):
job_id = f"{batch_id}_{i:03d}"
job = GenerationJob(
id=job_id,
prompt=prompt,
width=data.get('width', 1024),
height=data.get('height', 1024),
steps=data.get('steps', 4),
guidance=data.get('guidance', 7.5)
)
batch_processor.add_job(job)
job_ids.append(job_id)
return jsonify({
"batch_id": batch_id,
"job_ids": job_ids,
"status": "submitted"
})
@app.route('/api/batch//status', methods=['GET'])
def get_batch_status(batch_id):
# Get status of all jobs in batch
job_statuses = {}
for key, result in batch_processor.results.items():
if key.startswith(batch_id):
job_statuses[key] = result
completed = sum(1 for r in job_statuses.values() if r.get("status") == "completed")
total = len([k for k in batch_processor.results.keys() if k.startswith(batch_id)])
return jsonify({
"batch_id": batch_id,
"total_jobs": total,
"completed_jobs": completed,
"job_statuses": job_statuses
})
if __name__ == '__main__':
app.run(debug=True)
Performance Optimization for Batch Processing
Memory Management
Optimize memory usage for large batch operations:
- Model Caching: Load model once, reuse across generations
- Batch Size Tuning: Find optimal batch size for your GPU
- Memory Cleanup: Clear GPU memory between batches
- CPU Offloading: Move models to CPU when not in use
Production Deployment Considerations
Scalability and Reliability
- Load Balancing: Distribute work across multiple servers
- Error Handling: Robust error recovery and retry mechanisms
- Monitoring: Comprehensive logging and alerting
- Resource Management: Automatic scaling based on queue size
- Data Persistence: Reliable storage for jobs and results
Conclusion
Effective batch processing and automation transforms Flux Krea from a single-image tool into a production-ready system capable of handling enterprise-scale image generation requirements. The techniques covered in this guide provide the foundation for building sophisticated, scalable AI image generation workflows.
Start with simple batch scripts and gradually implement more advanced features like multi-GPU processing, quality filtering, and API integration as your needs grow. Remember to monitor performance and optimize for your specific hardware and use case requirements.