Code Implementation for Building a Pipeline for AI-Powered File Type Detection and Security Analysis with Magika and OpenAI

!pip install magika openai -q


import os, io, json, zipfile, textwrap, hashlib, tempfile, getpass
from pathlib import Path
from collections import Counter
from magika import Magika
from magika.types import MagikaResult, PredictionMode
from openai import OpenAI


print("🔑 Enter your OpenAI API key (input is hidden):")
api_key = getpass.getpass("OpenAI API Key: ")
client  = OpenAI(api_key=api_key)


try:
   client.models.list()
   print("✅ OpenAI connected successfullyn")
except Exception as e:
   raise SystemExit(f"❌ OpenAI connection failed: {e}")


m = Magika()
print("✅ Magika loaded successfullyn")
print(f"   module version : {m.get_module_version()}")
print(f"   model name     : {m.get_model_name()}")
print(f"   output types   : {len(m.get_output_content_types())} supported labelsn")


def ask_gpt(system: str, user: str, model: str = "gpt-4o", max_tokens: int = 600) -> str:
   resp = client.chat.completions.create(
       model=model,
       max_tokens=max_tokens,
       messages=[
           {"role": "system", "content": system},
           {"role": "user",   "content": user},
       ],
   )
   return resp.choices[0].message.content.strip()


print("=" * 60)
print("SECTION 1 — Core API + GPT Plain-Language Explanation")
print("=" * 60)


samples = {
   "Python":     b'import osndef greet(name):n    print(f"Hello, {name}")n',
   "JavaScript": b'const fetch = require("node-fetch");nasync function getData() { return await fetch("/api"); }',
   "CSV":        b'name,age,citynAlice,30,NYCnBob,25,LAn',
   "JSON":       b'{"name": "Alice", "scores": [10, 20, 30], "active": true}',
   "Shell":      b'#!/bin/bashnecho "Hello"nfor i in $(seq 1 5); do echo $i; done',
   "PDF magic":  b'%PDF-1.4n1 0 objn<< /Type /Catalog >>nendobjn',
   "ZIP magic":  bytes([0x50, 0x4B, 0x03, 0x04]) + bytes(26),
}


print(f"n{'Label':<12} {'MIME Type':<30} {'Score':>6}")
print("-" * 52)
magika_labels = []
for name, raw in samples.items():
   res = m.identify_bytes(raw)
   magika_labels.append(res.output.label)
   print(f"{res.output.label:<12} {res.output.mime_type:<30} {res.score:>5.1%}")


explanation = ask_gpt(
   system="You are a concise ML engineer. Explain in 4–5 sentences.",
   user=(
       f"Magika is Google's AI file-type detector. It just identified these types from raw bytes: "
       f"{magika_labels}. Explain how a deep-learning model detects file types from "
       "just bytes, and why this beats relying on file extensions."
   ),
   max_tokens=250,
)
print(f"n💬 GPT on how Magika works:n{textwrap.fill(explanation, 72)}n")


print("=" * 60)
print("SECTION 2 — Batch Identification + GPT Summary")
print("=" * 60)


tmp_dir = Path(tempfile.mkdtemp())
file_specs = {
   "code.py":     b"import sysnprint(sys.version)n",
   "style.css":   b"body { font-family: Arial; margin: 0; }n",
   "data.json":   b'[{"id": 1, "val": "foo"}, {"id": 2, "val": "bar"}]',
   "script.sh":   b"#!/bin/shnecho Hello Worldn",
   "doc.html":    b"

Hello

", "config.yaml": b"server:n host: localhostn port: 8080n", "query.sql": b"CREATE TABLE t (id INT PRIMARY KEY, name TEXT);n", "notes.md": b"# Headingnn- item onen- item twon", } paths = [] for fname, content in file_specs.items(): p = tmp_dir / fname p.write_bytes(content) paths.append(p) results = m.identify_paths(paths) batch_summary = [ {"file": p.name, "label": r.output.label, "group": r.output.group, "score": f"{r.score:.1%}"} for p, r in zip(paths, results) ] print(f"n{'File':<18} {'Label':<14} {'Group':<12} {'Score':>6}") print("-" * 54) for row in batch_summary: print(f"{row['file']:<18} {row['label']:<14} {row['group']:<12} {row['score']:>6}") gpt_summary = ask_gpt( system="You are a DevSecOps expert. Be concise and practical.", user=( f"A file upload scanner detected these file types in a batch: " f"{json.dumps(batch_summary)}. " "In 3–4 sentences, summarise what kind of project this looks like " "and flag any file types that might warrant extra scrutiny." ), max_tokens=220, ) print(f"n💬 GPT project analysis:n{textwrap.fill(gpt_summary, 72)}n")

Leave a Comment