| import comfy.sd |
| import comfy.clip_vision |
| import folder_paths |
| import comfy.utils |
| import torch |
| import random |
| from datetime import datetime |
| import random |
| import gc |
| import os |
| import json |
| import re |
|
|
| from .hard.mg_upscale_module import clear_gpu_and_ram_cache |
|
|
| |
| _checkpoint_cache = {} |
| _loaded_checkpoint = None |
| _lora_cache = {} |
| _active_lora_names = set() |
|
|
|
|
| def _clear_unused_loras(active_names): |
| """Remove unused LoRAs from cache and clear GPU memory.""" |
| unused = [n for n in _lora_cache if n not in active_names] |
| for n in unused: |
| del _lora_cache[n] |
| if unused: |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
|
|
|
|
| def _load_checkpoint(path): |
| """Load checkpoint from cache or disk.""" |
| if path in _checkpoint_cache: |
| return _checkpoint_cache[path] |
| model, clip, vae = comfy.sd.load_checkpoint_guess_config( |
| path, |
| output_vae=True, |
| output_clip=True, |
| embedding_directory=folder_paths.get_folder_paths("embeddings"), |
| )[:3] |
| _checkpoint_cache[path] = (model, clip, vae) |
| return model, clip, vae |
|
|
|
|
| def _unload_old_checkpoint(current_path): |
| """Unload checkpoint if it's different from the current one.""" |
| global _loaded_checkpoint |
| if _loaded_checkpoint and _loaded_checkpoint != current_path: |
| _checkpoint_cache.pop(_loaded_checkpoint, None) |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| _loaded_checkpoint = current_path |
|
|
|
|
|
|
| class MagicNodesCombiNode: |
| @classmethod |
| def INPUT_TYPES(cls): |
| def _loras_with_none(): |
| try: |
| return ["None"] + list(folder_paths.get_filename_list("loras")) |
| except Exception: |
| return ["None"] |
|
|
| return { |
| "required": { |
| |
|
|
|
|
| |
| "use_checkpoint": ("BOOLEAN", {"default": True}), |
| "checkpoint": (folder_paths.get_filename_list("checkpoints"), {}), |
| "clear_cache": ("BOOLEAN", {"default": False}), |
|
|
| |
| "use_lora_1": ("BOOLEAN", {"default": True}), |
| "lora_1": (_loras_with_none(), {}), |
| "strength_model_1": ("FLOAT", {"default": 1.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| "strength_clip_1": ("FLOAT", {"default": 1.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
|
|
| |
| "use_lora_2": ("BOOLEAN", {"default": False}), |
| "lora_2": (_loras_with_none(), {}), |
| "strength_model_2": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| "strength_clip_2": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
|
|
| |
| "use_lora_3": ("BOOLEAN", {"default": False}), |
| "lora_3": (_loras_with_none(), {}), |
| "strength_model_3": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| "strength_clip_3": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
|
|
| |
| "use_lora_4": ("BOOLEAN", {"default": False}), |
| "lora_4": (_loras_with_none(), {}), |
| "strength_model_4": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| "strength_clip_4": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
|
|
| |
| "use_lora_5": ("BOOLEAN", {"default": False}), |
| "lora_5": (_loras_with_none(), {}), |
| "strength_model_5": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| "strength_clip_5": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
|
|
| |
| "use_lora_6": ("BOOLEAN", {"default": False}), |
| "lora_6": (_loras_with_none(), {}), |
| "strength_model_6": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| "strength_clip_6": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| }, |
| "optional": { |
| "model_in": ("MODEL", {}), |
| "clip_in": ("CLIP", {}), |
| "vae_in": ("VAE", {}), |
|
|
| |
| "positive_prompt": ("STRING", {"multiline": True, "default": "", "dynamicPrompts": False}), |
| "negative_prompt": ("STRING", {"multiline": True, "default": "", "dynamicPrompts": False}), |
|
|
| |
| "positive_in": ("CONDITIONING", {}), |
| "negative_in": ("CONDITIONING", {}), |
|
|
| |
| "clip_set_last_layer_positive": ("INT", {"default": -2, "min": -20, "max": 0}), |
| "clip_set_last_layer_negative": ("INT", {"default": -2, "min": -20, "max": 0}), |
|
|
| |
| "recipe_slot": (["Off", "Slot 1", "Slot 2", "Slot 3", "Slot 4"], {"default": "Off", "tooltip": "Choose slot to save/load assembled setup."}), |
| "recipe_save": ("BOOLEAN", {"default": False, "tooltip": "Save current setup into the selected slot."}), |
| "recipe_use": ("BOOLEAN", {"default": False, "tooltip": "Load and apply setup from the selected slot for this run."}), |
|
|
| |
| "standard_pipeline": ("BOOLEAN", {"default": False, "tooltip": "Use vanilla order for CLIP: Set Last Layer -> Load LoRA -> Encode (same CLIP logic as standard ComfyUI)."}), |
|
|
| |
| "clip_lora_pos_gain": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 3.0, "step": 0.01, "tooltip": "Multiplier for CLIP-LoRA strength on positive branch (standard pipeline)."}), |
| "clip_lora_neg_gain": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 3.0, "step": 0.01, "tooltip": "Multiplier for CLIP-LoRA strength on negative branch (standard pipeline)."}), |
|
|
| |
| "dynamic_pos": ("BOOLEAN", {"default": False, "tooltip": "Deterministically expand choices in positive prompt (uses dyn_seed)."}), |
| "dynamic_neg": ("BOOLEAN", {"default": False, "tooltip": "Deterministically expand choices in negative prompt (uses dyn_seed)."}), |
| "dyn_seed": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFFFF, "tooltip": "Seed for dynamic prompt expansion (same seed used for both prompts)."}), |
| "dynamic_break_freeze": ("BOOLEAN", {"default": True, "tooltip": "If enabled, do not expand choices before the first |BREAK| marker; dynamic applies only after it."}), |
| "show_expanded_prompts": ("BOOLEAN", {"default": False, "tooltip": "Print expanded Positive/Negative prompts to console when dynamic is enabled."}), |
| "save_expanded_prompts": ("BOOLEAN", {"default": False, "tooltip": "Save expanded prompts to mod/dynPrompt/SEED_dd_mm_yyyy.txt when dynamic is enabled."}), |
| } |
| } |
|
|
|
|
| RETURN_TYPES = ("MODEL", "CLIP", "CONDITIONING", "CONDITIONING", "VAE") |
| RETURN_NAMES = ("MODEL", "CLIP", "Positive", "Negative", "VAE") |
| |
| |
| FUNCTION = "apply_magic_node" |
| CATEGORY = "MagicNodes" |
|
|
| def apply_magic_node(self, model_in=None, clip_in=None, checkpoint=None, |
| use_checkpoint=True, clear_cache=False, |
| use_lora_1=True, lora_1=None, strength_model_1=1.0, strength_clip_1=1.0, |
| use_lora_2=False, lora_2=None, strength_model_2=0.0, strength_clip_2=0.0, |
| use_lora_3=False, lora_3=None, strength_model_3=0.0, strength_clip_3=0.0, |
| use_lora_4=False, lora_4=None, strength_model_4=0.0, strength_clip_4=0.0, |
| use_lora_5=False, lora_5=None, strength_model_5=0.0, strength_clip_5=0.0, |
| use_lora_6=False, lora_6=None, strength_model_6=0.0, strength_clip_6=0.0, |
| positive_prompt="", negative_prompt="", |
| clip_set_last_layer_positive=-2, clip_set_last_layer_negative=-2, |
| vae_in=None, |
| recipe_slot="Off", recipe_save=False, recipe_use=False, |
| standard_pipeline=False, |
| clip_lora_pos_gain=1.0, clip_lora_neg_gain=1.0, |
| positive_in=None, negative_in=None, |
| dynamic_pos=False, dynamic_neg=False, dyn_seed=0, dynamic_break_freeze=True, |
| show_expanded_prompts=False, save_expanded_prompts=False): |
|
|
| global _loaded_checkpoint |
|
|
| |
| _checkpoint_cache.clear() |
| if clear_cache: |
| _lora_cache.clear() |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
|
|
| |
| def _recipes_path(): |
| base = os.path.join(os.path.dirname(__file__), "state") |
| os.makedirs(base, exist_ok=True) |
| return os.path.join(base, "combinode_recipes.json") |
| def _recipes_load(): |
| try: |
| with open(_recipes_path(), "r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception: |
| return {} |
| def _recipes_save(data: dict): |
| try: |
| with open(_recipes_path(), "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
| except Exception: |
| pass |
|
|
| |
| slot_idx = {"Off": 0, "Slot 1": 1, "Slot 2": 2, "Slot 3": 3, "Slot 4": 4}.get(str(recipe_slot), 0) |
| if slot_idx and bool(recipe_use): |
| rec = _recipes_load().get(str(slot_idx), None) |
| if rec is not None: |
| try: |
| use_checkpoint = rec.get("use_checkpoint", use_checkpoint) |
| checkpoint = rec.get("checkpoint", checkpoint) |
| clip_set_last_layer_positive = rec.get("clip_pos", clip_set_last_layer_positive) |
| clip_set_last_layer_negative = rec.get("clip_neg", clip_set_last_layer_negative) |
| positive_prompt = rec.get("pos_text", positive_prompt) |
| negative_prompt = rec.get("neg_text", negative_prompt) |
| rls = rec.get("loras", []) |
| if len(rls) >= 4: |
| (use_lora_1, lora_1, strength_model_1, strength_clip_1) = rls[0] |
| (use_lora_2, lora_2, strength_model_2, strength_clip_2) = rls[1] |
| (use_lora_3, lora_3, strength_model_3, strength_clip_3) = rls[2] |
| (use_lora_4, lora_4, strength_model_4, strength_clip_4) = rls[3] |
| if len(rls) >= 5: |
| (use_lora_5, lora_5, strength_model_5, strength_clip_5) = rls[4] |
| if len(rls) >= 6: |
| (use_lora_6, lora_6, strength_model_6, strength_clip_6) = rls[5] |
| print(f"[CombiNode] Loaded recipe Slot {slot_idx}.") |
| except Exception: |
| print(f"[CombiNode] Failed to apply recipe Slot {slot_idx}.") |
|
|
| |
| def _norm_prompt(s: str) -> str: |
| if not isinstance(s, str) or not s: |
| return s or "" |
| s2 = s.replace("\r", " ").replace("\n", " ") |
| s2 = re.sub(r"\s+", " ", s2) |
| s2 = re.sub(r"\s*,\s*", ", ", s2) |
| s2 = re.sub(r"(,\s*){2,}", ", ", s2) |
| return s2.strip() |
|
|
| |
| def _expand_dynamic(text: str, seed_val: int, freeze_before_break: bool = True) -> str: |
| if not isinstance(text, str) or (text.find('|') < 0): |
| return text |
| |
| if freeze_before_break and ('|BREAK|' in text): |
| pre, post = text.split('|BREAK|', 1) |
| return pre + '|BREAK|' + _expand_dynamic(post, seed_val, freeze_before_break=False) |
| rng = random.Random(int(seed_val) & 0xFFFFFFFF) |
| def _expand_pattern(t: str, pat: re.Pattern) -> str: |
| prev = None |
| cur = t |
| while prev != cur: |
| prev = cur |
| def repl(m): |
| body = m.group(1) |
| choices = [c.strip() for c in body.split('|') if c.strip()] |
| if not choices: |
| return m.group(0) |
| return rng.choice(choices) |
| cur = pat.sub(repl, cur) |
| return cur |
| for rx in ( |
| re.compile(r"\{([^{}]+)\}"), |
| re.compile(r"\(([^()]+)\)"), |
| re.compile(r"\[([^\[\]]+)\]"), |
| ): |
| text = _expand_pattern(text, rx) |
| return text |
|
|
| |
| pos_text_expanded = _norm_prompt(_expand_dynamic(positive_prompt, int(dyn_seed), bool(dynamic_break_freeze)) if bool(dynamic_pos) else positive_prompt) |
| neg_text_expanded = _norm_prompt(_expand_dynamic(negative_prompt, int(dyn_seed), bool(dynamic_break_freeze)) if bool(dynamic_neg) else negative_prompt) |
|
|
| def _valid_vae(v): |
| try: |
| return (v is not None) and (getattr(v, "first_stage_model", None) is not None) |
| except Exception: |
| return False |
|
|
| if use_checkpoint and checkpoint: |
| checkpoint_path = folder_paths.get_full_path_or_raise("checkpoints", checkpoint) |
| _unload_old_checkpoint(checkpoint_path) |
| base_model, base_clip, vae = _load_checkpoint(checkpoint_path) |
| model = base_model.clone() |
| |
| clip_source = base_clip or clip_in |
| if clip_source is None: |
| raise Exception("Checkpoint has no CLIP. Connect a CLIP input node or use a checkpoint that bundles CLIP.") |
| clip = clip_source.clone() |
| clip_clean = clip_source.clone() |
| |
| for candidate in (vae_in, vae): |
| if _valid_vae(candidate): |
| vae = candidate |
| break |
| else: |
| raise Exception("Checkpoint has no valid VAE. Connect a VAE input node or use a checkpoint that bundles VAE.") |
|
|
| elif model_in and clip_in: |
| _unload_old_checkpoint(None) |
| model = model_in.clone() |
| clip = clip_in.clone() |
| clip_clean = clip_in.clone() |
| vae = vae_in |
| if not _valid_vae(vae): |
| raise Exception("VAE input is missing or invalid. Please connect a proper VAE node.") |
| else: |
| raise Exception("No model selected!") |
|
|
| |
|
|
| |
| loras = [ |
| (use_lora_1, lora_1, strength_model_1, strength_clip_1), |
| (use_lora_2, lora_2, strength_model_2, strength_clip_2), |
| (use_lora_3, lora_3, strength_model_3, strength_clip_3), |
| (use_lora_4, lora_4, strength_model_4, strength_clip_4), |
| (use_lora_5, lora_5, strength_model_5, strength_clip_5), |
| (use_lora_6, lora_6, strength_model_6, strength_clip_6), |
| ] |
|
|
| active_lora_paths = [] |
| lora_stack = [] |
| defer_clip = bool(standard_pipeline) |
| for use_lora, lora_name, sm, sc in loras: |
| |
| name = str(lora_name).strip() if lora_name is not None else "" |
| if (not bool(use_lora)) or (name == "") or (name.lower() in ("none", "null", "off")): |
| continue |
| |
| try: |
| lora_path = folder_paths.get_full_path("loras", name) |
| except Exception: |
| lora_path = None |
| if (not lora_path) or (not os.path.exists(lora_path)): |
| continue |
| active_lora_paths.append(lora_path) |
| |
| if lora_path in _lora_cache: |
| lora_file = _lora_cache[lora_path] |
| else: |
| lora_file = comfy.utils.load_torch_file(lora_path, safe_load=True) |
| _lora_cache[lora_path] = lora_file |
| lora_stack.append((lora_file, float(sc), float(sm))) |
| sc_apply = 0.0 if defer_clip else sc |
| model, clip = comfy.sd.load_lora_for_models(model, clip, lora_file, sm, sc_apply) |
|
|
| _clear_unused_loras(active_lora_paths) |
| |
| try: |
| counts = {} |
| for p in active_lora_paths: |
| counts[p] = counts.get(p, 0) + 1 |
| dups = [p for p, c in counts.items() if c > 1] |
| if dups: |
| print(f"[CombiNode] Duplicate LoRA detected across slots: {len(dups)} file(s).") |
| except Exception: |
| pass |
|
|
| |
| |
| |
| src_clip = clip_clean if bool(standard_pipeline) else clip |
|
|
| pos_gain = float(clip_lora_pos_gain) |
| neg_gain = float(clip_lora_neg_gain) |
| skips_equal = int(clip_set_last_layer_positive) == int(clip_set_last_layer_negative) |
| |
| use_shared = bool(standard_pipeline) and skips_equal and (abs(pos_gain - neg_gain) < 1e-6) |
|
|
| if (positive_in is None) and (negative_in is None) and use_shared: |
| shared_clip = src_clip.clone() |
| shared_clip.clip_layer(clip_set_last_layer_positive) |
| for lora_file, sc, sm in lora_stack: |
| try: |
| _m_unused, shared_clip = comfy.sd.load_lora_for_models(model, shared_clip, lora_file, 0.0, sc * pos_gain) |
| except Exception: |
| pass |
| tokens_pos = shared_clip.tokenize(pos_text_expanded) |
| cond_pos = shared_clip.encode_from_tokens_scheduled(tokens_pos) |
| tokens_neg = shared_clip.tokenize(neg_text_expanded) |
| cond_neg = shared_clip.encode_from_tokens_scheduled(tokens_neg) |
| else: |
| |
| clip_pos = src_clip.clone() |
| clip_pos.clip_layer(clip_set_last_layer_positive) |
| if bool(standard_pipeline): |
| for lora_file, sc, sm in lora_stack: |
| try: |
| _m_unused, clip_pos = comfy.sd.load_lora_for_models(model, clip_pos, lora_file, 0.0, sc * pos_gain) |
| except Exception: |
| pass |
| if positive_in is not None: |
| cond_pos = positive_in |
| else: |
| tokens_pos = clip_pos.tokenize(pos_text_expanded) |
| cond_pos = clip_pos.encode_from_tokens_scheduled(tokens_pos) |
|
|
| |
| clip_neg = src_clip.clone() |
| clip_neg.clip_layer(clip_set_last_layer_negative) |
| if bool(standard_pipeline): |
| for lora_file, sc, sm in lora_stack: |
| try: |
| _m_unused, clip_neg = comfy.sd.load_lora_for_models(model, clip_neg, lora_file, 0.0, sc * neg_gain) |
| except Exception: |
| pass |
| if negative_in is not None: |
| cond_neg = negative_in |
| else: |
| tokens_neg = clip_neg.tokenize(neg_text_expanded) |
| cond_neg = clip_neg.encode_from_tokens_scheduled(tokens_neg) |
|
|
| |
| dyn_used = bool(dynamic_pos) or bool(dynamic_neg) |
| if dyn_used and (bool(show_expanded_prompts) or bool(save_expanded_prompts)): |
| |
| if bool(show_expanded_prompts): |
| try: |
| print(f"[CombiNode] Expanded prompts (dyn_seed={int(dyn_seed)}):") |
| def _print_block(name, src, expanded): |
| print(name + ":") |
| if bool(dynamic_break_freeze) and ('|BREAK|' in src) and ((name=="Positive" and bool(dynamic_pos)) or (name=="Negative" and bool(dynamic_neg))): |
| print(" static") |
| print(" " + expanded) |
| _print_block("Positive", positive_prompt, pos_text_expanded) |
| _print_block("Negative", negative_prompt, neg_text_expanded) |
| except Exception: |
| pass |
| |
| if bool(save_expanded_prompts): |
| try: |
| base = os.path.join(os.path.dirname(__file__), "dynPrompt") |
| os.makedirs(base, exist_ok=True) |
| now = datetime.now() |
| fname = f"{int(dyn_seed)}_{now.day:02d}_{now.month:02d}_{now.year}.txt" |
| path = os.path.join(base, fname) |
| lines = [] |
| def _append_block(name, src, expanded): |
| lines.append(name + ":\n") |
| if bool(dynamic_break_freeze) and ('|BREAK|' in src) and ((name=="Positive" and bool(dynamic_pos)) or (name=="Negative" and bool(dynamic_neg))): |
| lines.append("static\n") |
| lines.append(expanded + "\n\n") |
| _append_block("Positive", positive_prompt, pos_text_expanded) |
| _append_block("Negative", negative_prompt, neg_text_expanded) |
| with open(path, 'w', encoding='utf-8') as f: |
| f.writelines(lines) |
| except Exception: |
| pass |
|
|
| |
| if slot_idx and bool(recipe_save): |
| data = _recipes_load() |
| data[str(slot_idx)] = { |
| "use_checkpoint": bool(use_checkpoint), |
| "checkpoint": checkpoint, |
| "clip_pos": int(clip_set_last_layer_positive), |
| "clip_neg": int(clip_set_last_layer_negative), |
| "pos_text": str(positive_prompt), |
| "neg_text": str(negative_prompt), |
| "loras": [ |
| [bool(use_lora_1), lora_1, float(strength_model_1), float(strength_clip_1)], |
| [bool(use_lora_2), lora_2, float(strength_model_2), float(strength_clip_2)], |
| [bool(use_lora_3), lora_3, float(strength_model_3), float(strength_clip_3)], |
| [bool(use_lora_4), lora_4, float(strength_model_4), float(strength_clip_4)], |
| [bool(use_lora_5), lora_5, float(strength_model_5), float(strength_clip_5)], |
| [bool(use_lora_6), lora_6, float(strength_model_6), float(strength_clip_6)], |
| ], |
| } |
| _recipes_save(data) |
| print(f"[CombiNode] Saved recipe Slot {slot_idx}.") |
|
|
| |
| return (model, src_clip if bool(standard_pipeline) else clip, cond_pos, cond_neg, vae) |
|
|
|
|
|
|
|
|
|
|