はじめに
「このPC、費用で落とせる?」「この工事、資産として計上すべき?」
経理部門では日々、このような判断が繰り返されています。特に固定資産の判定は、金額や品目、関連する費用などを複合的に評価する必要があり、専門知識と多くの時間を要する悩ましいタスクです。
しかし、現状のAIであれば、一つ一つのタスクは非常に高い精度で行えることが予想され、詳細なルールについてもjsonの参照やcsvファイルのログ保管による再学習など、なんとなくできそうな要素はそろっておりました。
こうして、見積書や請求書のPDFをAIで解析し、会計処理を自動で提案する「固定資産判定AIエージェント」の開発プロジェクトをスタートさせました。
目標は、再現性・説明可能性・監査耐性という3つの重要な要件を満たす、堅牢な「ハイブリッド判定」を実現できる機構を構築し、デプロイすることです。
1: PoCの成功と、見えてきた「3つの壁」
プロジェクトの初期段階、私たちは技術実証(PoC)として、`DocumentAnalysisAgent`と`AssetAdvisoryAgent`という2つのエージェントからなるシンプルなパイプラインを構築しました。その処理は以下の通りです。
1.1. PDF解析 (`DocumentAnalysisAgent`)
Google Cloud Document AIを使い、PDFから品目、数量、金額などを構造化データとして抽出する。
1.2. ルール判定 (`AssetAdvisoryAgent`)
抽出データに対し、「10万円未満なら費用」のような単純なルール(`rules.json`)で判定する。
1.3. LLM分類
ルールで判定できないものを、LLM(当時はGemini)に丸投げして分類させる。
この構成でプロトタイプはひとまずそれっぽく動きました。
しかし、実用するには、4つの大きな壁が立ちはだかっていることにすぐに気づきました。
A. 【グルーピングの欠落 - AIは「木を見て森を見ず」】
会計実務では、「PC本体(15万円)」と「設置工事(3万円)」は不可分なものとして合算し、「18万円の資産」として扱います。しかし、システムは各行を独立したものとしてしか見ないため、それぞれを「資産」「費用」と誤って判定してしまいました。
有利な判定をしすぎて、税務当局から厳しいご指摘を受けることは必至です。
B. 【ルール競合と保守性の限界 - 秘伝のタレ化した「if文」】
「30万円未満は費用(中小企業特例)」「10万円未満は費用」「ソフトウェアなら無形固定資産」といったルールが増えるにつれ、条件分岐が複雑怪奇な「スパゲッティ状態」に陥りました。特に、特例ルールと一般ルールが競合した場合、どちらを優先すべきかロジックが追いづらくなり、新たなルール追加が既存の判定に意図せぬ影響を及ぼすリスクが急増しました。複雑怪奇なフローチャートに良いことは一つもありません。
C. 【 LLMの気まぐれと説明責任 - 「AIがそう判断したので」は通用しない】
LLMは非常に優秀ですが、その動作は本質的に非決定的です。同じ入力でも、モデルの更新や内部的な確率の揺れで出力が変わることがあります。Aでも記載した通り、税務調査で「なぜこれは費用なのですか?」と問われた際に、「AIがそう判断したので」という答えは通用しません。判定の根拠を明確に提示できないことは、監査対応上、致命的すぎる欠陥でした。
2.構造の見直し - 信頼性への3つの工夫
これらの壁を乗り越えるため、構造を根本から見直し、AIの驚異的な能力を認めつつも、その挙動を人間が定めたルールと調和させ、コントロール可能な仕組みを目指しました。
工夫1:合議アンサンブルによる「資産グルーピング」 (`bundler_v2.py`)
最初の壁「グルーピング」を解決するため、bundler.pyを導入しました。これは、単なるキーワードマッチャーではなく、複数の専門家(チャネル)が寄ってたかって議論するような「多チャネル合議モデル」です。
# bundler.py (Ensemble Bundler v2)
import re, math
from typing import List, Dict, Any, Optional
# --- 1) 語彙(必要なら外部JSON化して運用で拡張) --------------------------
KW_PARENT_LIST = [
"本体","本機","PC","パソコン","ノートPC","サーバ","複合機","プリンタ",
"工作機械","ルータ","スイッチ","NAS","UPS","デスクトップ","ノートブック",
"タブレット","スキャナ","プロジェクタ","サーバー"
]
KW_INCIDENTAL_LIST = [
"設置","据付","据え付け","据付工事","配線","配線工事","搬入","納入","配送","運搬",
"初期設定","セットアップ","導入支援","キッティング","ライセンス発行費","検収",
"試運転","取付","養生","設置工事","据付費","導入作業"
]
KW_MAINT_LIST = [
"保守","メンテ","サポート","年間","月額","更新料","延長保証","SaaS",
"サブスク","クラウド","年間保守","年間利用料","ライセンス更新","サポート契約"
]
KW_REMOVAL_LIST = ["撤去","廃棄","解体","回収","下取り"] # 参考:別バンドルにしたい場合に使える
# 事前コンパイル(日本語は語境界が曖昧なので部分一致でOK、後段スコアで制御)
PARENT_RE = re.compile("|".join(map(re.escape, KW_PARENT_LIST)))
INCIDENTAL_RE = re.compile("|".join(map(re.escape, KW_INCIDENTAL_LIST)))
MAINT_RE = re.compile("|".join(map(re.escape, KW_MAINT_LIST)))
# --- 2) 軽量正規化 ---------------------------------------------------------
def normalize_ja(s: str) -> str:
if not s:
return ""
try:
# 任意: jaconv があれば全角→半角/カナ正規化
import jaconv
s = jaconv.z2h(s, ascii=True, digit=True, kana=True)
s = jaconv.normalize(s)
except Exception:
pass
s = s.lower()
s = s.replace(" ", " ")
s = s.replace("‐","-").replace("―","-").replace("ー","-") # 長音ゆれ
s = re.sub(r"\s+", " ", s)
return s.strip()
def tokenize_light(s: str) -> set:
# 依存無しの簡易トークナイザ(名詞中心の雰囲気)
s = normalize_ja(s)
return set(re.findall(r"[一-龥ぁ-んァ-ンa-zA-Z0-9]+", s))
def ngrams(s: str, n: int = 2) -> set:
s = re.sub(r"\s", "", normalize_ja(s))
return set(s[i:i+n] for i in range(max(0, len(s)-n+1)))
# --- 3) 各チャネルのスコア -------------------------------------------------
def keyword_score(desc: str, rx: re.Pattern) -> float:
return 1.0 if rx.search(desc or "") else 0.0
def jaccard(a: set, b: set) -> float:
if not a or not b:
return 0.0
return len(a & b) / len(a | b)
def proximity_score(i: int, j: int, max_win: int = 6) -> float:
d = abs(i - j)
return max(0.0, 1.0 - d / max_win)
def amount_ratio_score(child_amt: float, parent_amt: float) -> float:
if parent_amt <= 0 or child_amt <= 0:
return 0.0
r = child_amt / parent_amt
# 1%〜30%で最大になるガウス(取得付随費用の典型レンジ)
mu, sigma = 0.15, 0.10
return math.exp(-((r - mu) ** 2) / (2 * sigma ** 2))
def embed_sim_score(vp: Optional[list], vc: Optional[list]) -> float:
# 任意: 事前に単位ベクトル化した embedding を lines[*]["embed"] に入れておくと有効化
if not vp or not vc:
return 0.0
# 安全のため次元の最小に合わせる
d = min(len(vp), len(vc))
if d == 0:
return 0.0
dot = sum(vp[i] * vc[i] for i in range(d))
return float(dot)
# --- 4) メイン:多重合議バンドル -------------------------------------------
def bundle_lines(
lines: List[Dict[str, Any]],
group_window: int = 6,
parent_top_amount_ratio: float = 0.5,
weights: Dict[str, float] = None,
thresholds: Dict[str, float] = None
) -> List[Dict[str, Any]]:
"""
lines: [{line_uid, description, amount, page, row_index, vendor, embed(optional)}, ...]
return: 各行に {bundle_id, role(parent|incidental|maintenance|solo), _bundle_score, _bundle_trace} を付与
"""
if weights is None:
weights = {
"kw": 0.30, "fuzzy": 0.10, "morph": 0.15,
"embed": 0.15, "prox": 0.10, "ratio": 0.15, "vendor": 0.05
}
if thresholds is None:
thresholds = {"incidental": 0.70, "maintenance": 0.55}
n = len(lines)
# 初期化 & 安全なUID
for idx, ln in enumerate(lines):
ln.setdefault("bundle_id", None)
ln.setdefault("role", "solo")
ln.setdefault("line_uid", f"temp_uid_{idx}")
ln.setdefault("row_index", idx)
ln.setdefault("page", 1)
# 金額上位を親候補にしつつ、親辞書も見る(取りこぼし対策)
amounts = [(i, float(lines[i].get("amount") or 0.0)) for i in range(n)]
if amounts:
top_amt = max(a for _, a in amounts) or 0.0
else:
top_amt = 0.0
parent_candidates = []
for i, amt in amounts:
desc_i = normalize_ja(lines[i].get("description") or "")
if keyword_score(desc_i, PARENT_RE) or (amt >= top_amt * parent_top_amount_ratio and amt > 0):
parent_candidates.append(i)
# 特徴量の事前計算
tokens = [tokenize_light(ln.get("description") or "") for ln in lines]
grams2 = [ngrams(ln.get("description") or "", 2) for ln in lines]
# 役割付与(多重親にも対応するが、最終的にスコア最大の親にひもづけ)
proposals: Dict[int, Dict[int, Dict[str, Any]]] = {} # parent_idx -> child_idx -> proposal
for p in parent_candidates:
parent_amt = float(lines[p].get("amount") or 0.0)
desc_p = normalize_ja(lines[p].get("description") or "")
token_p = tokens[p]; gram_p = grams2[p]
vendor_p = lines[p].get("vendor")
embed_p = lines[p].get("embed")
# 親自身
proposals.setdefault(p, {})
proposals[p][p] = {
"role": "parent",
"score": 1.0,
"trace": {"kw": keyword_score(desc_p, PARENT_RE)}
}
# 近傍探索(前後)
for j in range(max(0, p - group_window), min(n, p + group_window + 1)):
if j == p:
continue
if lines[j].get("bundle_id") is not None and lines[j].get("role") == "parent":
continue
desc_c = normalize_ja(lines[j].get("description") or "")
token_c = tokens[j]; gram_c = grams2[j]
embed_c = lines[j].get("embed")
s_kw_inc = keyword_score(desc_c, INCIDENTAL_RE)
s_kw_maint= keyword_score(desc_c, MAINT_RE)
s_fuzzy = jaccard(gram_p, gram_c)
s_morph = jaccard(token_p, token_c)
s_embed = embed_sim_score(embed_p, embed_c)
s_prox = proximity_score(lines[p]["row_index"], lines[j]["row_index"], max_win=group_window)
s_ratio = amount_ratio_score(float(lines[j].get("amount") or 0.0), parent_amt)
s_vendor = 1.0 if vendor_p and vendor_p == lines[j].get("vendor") else 0.0
score = (
weights["kw"] * max(s_kw_inc, s_kw_maint) +
weights["fuzzy"]* s_fuzzy +
weights["morph"]* s_morph +
weights["embed"]* s_embed +
weights["prox"] * s_prox +
weights["ratio"]* s_ratio +
weights["vendor"]* s_vendor
)
# maintenance は incidental に吸われないよう優先タグをもたせる
role = None
if s_kw_maint >= 0.5 and score >= thresholds["maintenance"]:
role = "maintenance"
elif score >= thresholds["incidental"]:
role = "incidental"
if role:
proposals[p][j] = {
"role": role,
"score": round(float(score), 3),
"trace": {
"kw_inc": s_kw_inc, "kw_maint": s_kw_maint,
"fuzzy": s_fuzzy, "morph": s_morph, "embed": s_embed,
"prox": s_prox, "ratio": s_ratio, "vendor": s_vendor
}
}
# 競合解決:同じ子に複数の親から提案が来たら、score最大の親に紐づけ
bundle_id = 0
best_parent_for_child: Dict[int, Dict[str, Any]] = {} # child_idx -> {p, role, score, trace}
for p, children in proposals.items():
# 親自体の確定
if p not in best_parent_for_child or children[p]["score"] > best_parent_for_child[p].get("score", 0):
best_parent_for_child[p] = {"p": p, **children[p]}
for j, prop in children.items():
if j == p:
continue
if (j not in best_parent_for_child) or (prop["score"] > best_parent_for_child[j]["score"]):
best_parent_for_child[j] = {"p": p, **prop}
# 親ごとに bundle_id を振る
parent_to_bundle: Dict[int, int] = {}
for child_idx, info in best_parent_for_child.items():
p = info["p"]
if p not in parent_to_bundle:
bundle_id += 1
parent_to_bundle[p] = bundle_id
# 最終アサイン
for child_idx, info in best_parent_for_child.items():
p = info["p"]
bid = parent_to_bundle[p]
role = info["role"]
lines[p]["bundle_id"] = bid
lines[p]["role"] = "parent"
lines[p]["_bundle_score"] = 1.0
lines[p]["_bundle_trace"] = {"kw_parent": keyword_score(normalize_ja(lines[p].get("description") or ""), PARENT_RE)}
# 親以外
if child_idx != p:
lines[child_idx]["bundle_id"] = bid
lines[child_idx]["role"] = role
lines[child_idx]["_bundle_score"] = info["score"]
lines[child_idx]["_bundle_trace"] = info["trace"]
# 未所属は solo
for ln in lines:
ln.setdefault("bundle_id", None)
ln.setdefault("role", "solo")
ln.setdefault("_bundle_score", 0.0)
# ガード:子が親の60%以上の金額→別物疑いで incidental を抑制(誤検出減)
for ln in lines:
if ln["role"] in ("incidental",):
bid = ln["bundle_id"]
if not bid:
continue
# 親を探す
parent_amt = None
for x in lines:
if x.get("bundle_id") == bid and x.get("role") == "parent":
parent_amt = float(x.get("amount") or 0.0)
break
if parent_amt and float(ln.get("amount") or 0.0) >= parent_amt * 0.60:
# 役割降格
ln["role"] = "solo"
ln["_bundle_trace"] = {**ln.get("_bundle_trace", {}), "guard": "child>=60% parent → demote to solo"}
return lines
- *キーワード* 「本体」「設置工事」といった直接的な単語をチェック。
- *類似度* 文字列の類似度(Jaccard係数など)で表記ゆれを評価。
- *金額比率* 親品目に対する付随費用の金額バランスを評価。
- *意味ベクトル* 単語の意味ベクトル(Embedding)で意味的な関連性を評価。
これらの要素を統合すれば、システムは単なる「点」(行)ではなく、「面」(資産グループ)で物事を捉えられるようになると思われますが、語彙の量や正確さを考慮すると、まだまだ課題があります。
工夫2:「優先度リスト」方式の厳格なルールエンジン (`rule_engine_strict.py`)
次の壁である「優先度」の問題の解決にあたって、資本的支出をはじめとした、フローチャート型の判定をルール化するには、ルールエンジンの存在が不可欠です。rule_engine_strict.pyにより、途中の根拠を残して結論に至る人間を模倣してみました。
import json
import os
import csv
from datetime import datetime
import uuid
import sys
import ast
import re
import unicodedata
from typing import Any, Dict, List, Optional, Tuple, Union
from dataclasses import dataclass, field
from copy import deepcopy
# ---- RuleEngineError Definitions ----
class RuleEngineError(Exception):
pass
class RuleEngineEvalError(RuleEngineError):
def __init__(self, expression: str, reason: str, node_id: Optional[str] = None):
super().__init__(f"EvalError at node={node_id}: {reason} in expression: {expression}")
self.expression = expression
self.reason = reason
self.node_id = node_id
class RuleEngineStructureError(RuleEngineError):
pass
# ---- Safe evaluator (AST) ----
class _SafeEvaluator(ast.NodeVisitor):
ALLOWED_BINOPS = (ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Mod, ast.FloorDiv)
ALLOWED_CMPOPS = (ast.Eq, ast.NotEq, ast.Lt, ast.LtE, ast.Gt, ast.GtE)
ALLOWED_BOOLOPS = (ast.And, ast.Or)
ALLOWED_UNARYOPS = (ast.Not, ast.USub, ast.UAdd)
def __init__(self, context: Dict[str, Any], functions: Dict[str, Any], node_id: Optional[str], expr: str):
self.ctx = context
self.funcs = functions
self.node_id = node_id
self.expr = expr
def generic_visit(self, node):
raise RuleEngineEvalError(self.expr, f"Disallowed syntax: {type(node).__name__}", self.node_id)
def visit_Expression(self, node: ast.Expression):
return self.visit(node.body)
def visit_Name(self, node: ast.Name):
if node.id in {"True", "False", "None"}:
return {"True": True, "False": False, "None": None}[node.id]
if node.id in self.ctx:
return self.ctx[node.id]
raise RuleEngineEvalError(self.expr, f"Unknown variable '{node.id}'", self.node_id)
def visit_Constant(self, node: ast.Constant):
return node.value
def visit_BoolOp(self, node: ast.BoolOp):
if not isinstance(node.op, self.ALLOWED_BOOLOPS):
raise RuleEngineEvalError(self.expr, "BoolOp not allowed", self.node_id)
if isinstance(node.op, ast.And):
val = True
for v in node.values:
val = bool(val) and bool(self.visit(v))
return val
if isinstance(node.op, ast.Or):
val = False
for v in node.values:
val = bool(val) or bool(self.visit(v))
return val
def visit_UnaryOp(self, node: ast.UnaryOp):
if not isinstance(node.op, self.ALLOWED_UNARYOPS):
raise RuleEngineEvalError(self.expr, "UnaryOp not allowed", self.node_id)
operand = self.visit(node.operand)
if isinstance(node.op, ast.Not):
return not bool(operand)
if isinstance(node.op, ast.USub):
return -operand
if isinstance(node.op, ast.UAdd):
return +operand
def visit_BinOp(self, node: ast.BinOp):
if not isinstance(node.op, self.ALLOWED_BINOPS):
raise RuleEngineEvalError(self.expr, "BinOp not allowed", self.node_id)
left = self.visit(node.left)
right = self.visit(node.right)
try:
if isinstance(node.op, ast.Add): return left + right
if isinstance(node.op, ast.Sub): return left - right
if isinstance(node.op, ast.Mult): return left * right
if isinstance(node.op, ast.Div): return left / right
if isinstance(node.op, ast.Mod): return left % right
if isinstance(node.op, ast.FloorDiv): return left // right
except Exception as e:
raise RuleEngineEvalError(self.expr, f"BinOp error: {e}", self.node_id)
def visit_Compare(self, node: ast.Compare):
left = self.visit(node.left)
result = True
for op, comparator in zip(node.ops, node.comparators):
right = self.visit(comparator)
if isinstance(op, ast.Eq): ok = (left == right)
elif isinstance(op, ast.NotEq): ok = (left != right)
elif isinstance(op, ast.Lt): ok = (left < right)
elif isinstance(op, ast.LtE): ok = (left <= right)
elif isinstance(op, ast.Gt): ok = (left > right)
elif isinstance(op, ast.GtE): ok = (left >= right)
else:
raise RuleEngineEvalError(self.expr, "CmpOp not allowed", self.node_id)
result = result and ok
left = right
return result
def visit_Call(self, node: ast.Call):
if not isinstance(node.func, ast.Name):
raise RuleEngineEvalError(self.expr, "Only plain function calls allowed", self.node_id)
fname = node.func.id
if fname not in self.funcs:
raise RuleEngineEvalError(self.expr, f"Function '{fname}' not allowed", self.node_id)
func = self.funcs[fname]
args = [self.visit(a) for a in node.args]
kwargs = {kw.arg: self.visit(kw.value) for kw in node.keywords}
try:
return func(*args, **kwargs)
except Exception as e:
raise RuleEngineEvalError(self.expr, f"Error in call {fname}: {e}", self.node_id)
def _safe_eval(expr: str, context: Dict[str, Any], functions: Dict[str, Any], node_id: Optional[str] = None) -> Any:
try:
tree = ast.parse(expr, mode='eval')
except SyntaxError as e:
raise RuleEngineEvalError(expr, f"SyntaxError: {e}", node_id)
return _SafeEvaluator(context, functions, node_id, expr).visit(tree)
@dataclass
class EngineTrace:
path: List[str] = field(default_factory=list)
decisions: List[Dict[str, Any]] = field(default_factory=list) # {node, type, detail}
class RuleEngineStrict:
def __init__(self, rules: Dict[str, Any]):
self.rules = rules
self.keyword_sets = rules.get("keyword_sets", {})
self.nodes = rules.get("nodes", {})
self.start_node = rules.get("start_node")
if not self.start_node or self.start_node not in self.nodes:
raise RuleEngineStructureError("start_node is missing or not found in nodes")
self._validate_structure()
def _validate_structure(self):
# Basic structural checks
for node_id, node in self.nodes.items():
ntype = node.get("type")
if not ntype:
raise RuleEngineStructureError(f"Node '{node_id}' missing 'type'")
if ntype == "evaluation_node":
conds = node.get("conditions")
if not isinstance(conds, list) or not conds:
raise RuleEngineStructureError(f"evaluation_node '{node_id}' must have non-empty 'conditions' list")
# Pre-parse expressions to detect syntax errors early
for c in conds:
expr = c.get("expression", "")
if not isinstance(expr, str) or not expr.strip():
raise RuleEngineStructureError(f"evaluation_node '{node_id}' has empty expression")
try:
ast.parse(expr, mode='eval')
except SyntaxError as e:
raise RuleEngineStructureError(f"Invalid expression at node '{node_id}': {expr} / {e}")
next_node = c.get("next_node")
if next_node and next_node not in self.nodes:
raise RuleEngineStructureError(f"evaluation_node '{node_id}' points to unknown next_node '{next_node}'")
elif ntype == "question":
answers = node.get("answers")
if not isinstance(answers, list) or not answers:
raise RuleEngineStructureError(f"question '{node_id}' must have answers")
for ans in answers:
nxt = ans.get("next_node")
if nxt and nxt not in self.nodes:
raise RuleEngineStructureError(f"question '{node_id}' answer points to unknown next_node '{nxt}'")
elif ntype == "value_input_question":
# next_node must exist
nxt = node.get("next_node")
if nxt and nxt not in self.nodes:
raise RuleEngineStructureError(f"value_input_question '{node_id}' points to unknown next_node '{nxt}'")
elif ntype == "conclusion":
# ok
pass
else:
raise RuleEngineStructureError(f"Unknown node type '{ntype}' at '{node_id}'")
# ---- helpers exposed to expressions ----
def _check_keywords(self, description_text: str, keyword_set_name: str) -> bool:
if description_text is None:
return False
text = str(description_text).lower()
kws = self.keyword_sets.get(keyword_set_name, [])
return any(k.lower() in text for k in kws)
def _auto_logic_answer(self, node: Dict[str, Any], ctx: Dict[str, Any]) -> Optional[str]:
auto = node.get("auto_logic")
if not auto:
return None
typ = auto.get("type")
variable = auto.get("variable")
text = str(ctx.get(variable, "") or "")
if typ == "keyword_match":
set_name = auto.get("keyword_set_ref")
neg = set(auto.get("negative_keywords", []))
has_neg = any(n in text for n in neg) if neg else False
matched = self._check_keywords(text, set_name) if set_name else False
return auto.get("answer_if_match") if (matched and not has_neg) else auto.get("answer_if_not_match")
if typ == "keyword_map":
# Try in order
for m in auto.get("answer_map", []):
if self._check_keywords(text, m.get("keyword_set_ref", "")):
return m.get("answer")
return auto.get("default_answer")
return None
def run(self, context: Dict[str, Any], *, collect_trace: bool = True) -> Dict[str, Any]:
trace = EngineTrace()
node_id = self.start_node
ctx = dict(context) # shallow copy
functions = {"check_keywords": self._check_keywords}
while True:
node = self.nodes.get(node_id)
if not node:
raise RuleEngineStructureError(f"Node '{node_id}' not found during run")
trace.path.append(node_id)
ntype = node.get("type")
if ntype == "conclusion":
return {
"status": "concluded",
"node_id": node_id,
"conclusion": node.get("text"),
"reason": node.get("reason"),
"trace": trace.__dict__ if collect_trace else None,
}
if ntype == "value_input_question":
# Non-interactive mode: require the value to be present in context
var = node.get("variable_name")
if var not in ctx or ctx.get(var) is None:
return {
"status": "needs_input",
"node_id": node_id,
"message": f"Missing required input variable '{var}' for node '{node_id}'",
"trace": trace.__dict__ if collect_trace else None,
}
# If present, just forward to next
node_id = node.get("next_node")
trace.decisions.append({"node": node_id, "type": "value_input_passthrough", "variable": var})
continue
if ntype == "question":
# Attempt auto_logic; if none, fallback to heuristic for specific known nodes
auto_ans = self._auto_logic_answer(node, ctx)
if auto_ans is None:
# Heuristic: if node asks whether amount known, use presence of ctx['expenditure_amount']
if node_id == "q_knows_expenditure":
auto_ans = "はい" if ctx.get("expenditure_amount") is not None else "いいえ"
else:
return {
"status": "undetermined",
"node_id": node_id,
"message": f"Question node '{node_id}' lacks auto_logic in non-interactive mode",
"trace": trace.__dict__ if collect_trace else None,
}
# Find answer route
next_node = None
for ans in node.get("answers", []):
if ans.get("label") == auto_ans:
next_node = ans.get("next_node")
break
if not next_node:
return {
"status": "undetermined",
"node_id": node_id,
"message": f"Auto answer '{auto_ans}' not found in answers at node '{node_id}'",
"trace": trace.__dict__ if collect_trace else None,
}
trace.decisions.append({"node": node_id, "type": "question_auto", "answer": auto_ans})
node_id = next_node
continue
if ntype == "evaluation_node":
conditions = node.get("conditions", [])
matched = False
for cond in conditions:
expr = cond.get("expression", "")
next_node = cond.get("next_node")
try:
result = bool(_safe_eval(expr, ctx, functions, node_id))
except RuleEngineEvalError as e:
return {
"status": "error",
"node_id": node_id,
"message": str(e),
"expression": e.expression,
"trace": trace.__dict__ if collect_trace else None,
}
if result:
trace.decisions.append({"node": node_id, "type": "evaluation", "expression": expr, "result": True})
node_id = next_node
matched = True
break
else:
trace.decisions.append({"node": node_id, "type": "evaluation", "expression": expr, "result": False})
if not matched:
return {
"status": "undetermined",
"node_id": node_id,
"message": f"No condition matched at evaluation_node '{node_id}'",
"trace": trace.__dict__ if collect_trace else None,
}
continue
# Unknown type
raise RuleEngineStructureError(f"Unsupported node type '{ntype}' at '{node_id}'")
「安全で監査しやすい判定エンジン(ルール実行器)」をイメージしており、外部(JSON 等)で定義した分岐ルールを 決定木 として実行し、途中経路と根拠を残しつつ結論に到達させるための基盤となっています。固定資産判定や税務・社内規程の適用可否のように「説明責任と再現性」が要る領域に向いた設計になっているのではないでしょうか?
固定資産判定で最もぶち当たるといっても過言ではない固定資産 vs 修繕費の判定については、金額閾値・資本的支出キーワード・期間/効果の基準・取得付随費用か否かなどをrules.jsonをベースに条件式 として表現し、結論テキスト+理由を返すのがベターという認識です。また、取得付随費用の切り分けについては、先に bundler.py で 親行と付随行/保守行にバンドルしてから、結果を context に入れて本エンジンで 会計処理区分を確定させに行くことで、グルーピング問題の解決を図っています。
(例)is_incidental=True and amount_ratio<=0.3 等。
同様に、会計基準以外の社内規程・稟議条件も適用可能なはずですね。企業によって○○円以上は申請が必要といった条項はあると思います。
工夫3:「契約ベースLLM」による再現性の確保 (`llm_classifier.py`)
3つ目の壁「LLMの信頼性」を克服するには、さらに別の機構が必要です。llm_classifier.pyにより、LLMを「気まぐれな天才」から「堅実なコンポーネント」へと変容させるための仕組みが必要でした。
# llm_classifier.py
import hashlib
import json
import time
import os
import google.generativeai as genai
from jsonschema import validate, ValidationError
# このファイルがどこにあってもschemas.pyを正しく参照するためのパス設定
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from schemas import LLM_CLASSIFY_SCHEMA
PROMPT_VERSION = "clf_v3.2" # few-shotやプロンプトロジック更新時にインクリメント
# LLMクライアントの初期化(一度だけ行う)
# APIキーは環境変数から読み込むのが望ましい
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("環境変数 'GEMINI_API_KEY' が設定されていません。")
genai.configure(api_key=api_key)
# model_nameは固定
llm_model = genai.GenerativeModel('gemini-2.5-pro-latest')
def _hash_prompt(s: str) -> str:
"""プロンプトのハッシュ値を計算して、トレーサビリティを確保する"""
return "sha256:" + hashlib.sha256(s.encode("utf-8")).hexdigest()[:16]
def build_prompt_contract(item, company_ctx, period_ctx, rule_trace) -> str:
"""
LLMに渡すための、バージョン管理された決定的なプロンプト(契約)を構築する。
item: 正規化済み行データ {description, amount, quantity, unit, ...}
rule_trace: ルール評価結果(不決定時のみLLMへ)
"""
# 温度0/関数型出力前提の“契約”プロンプト
# few-shot examplesはここに埋め込むか、外部ファイルから読み込む
return f'''
You are a deterministic Japanese accounting classifier. Output ONLY JSON that adheres to the specified schema.
No extra text, comments, or markdown should be included in the output.
# Schema:
{json.dumps(LLM_CLASSIFY_SCHEMA, indent=2)}
# Decision Policy (Japanese SME Context):
- <100,000 JPY: expense (即時費用).
- 100,000–200,000 JPY (non-SME): Can be 'pooled_3yr' (一括償却資産). Treat as 'review' to let user decide.
- SME Special Rule: Can expense <300,000 JPY if annual cap is not exceeded. If cap status is unknown, treat as 'review'.
- Software: Perpetual license or significant in-house development is 'asset'. SaaS/subscriptions are 'expense'.
- Repair vs. Capital Improvement: If it significantly increases value or service life, it is 'asset'; otherwise 'expense'.
- Bundled Items: Installation/shipping costs accompanying a main asset are part of the asset's acquisition cost ('asset').
# Input Data:
```json
{json.dumps({ "item": item, "company_ctx": company_ctx, "period_ctx": period_ctx, "rule_trace": rule_trace, "prompt_version": PROMPT_VERSION }, ensure_ascii=False, indent=2)}
```
Respond with JSON only.
'''.strip()
def classify_with_llm(item, company_ctx, period_ctx, rule_trace, max_retries=2):
"""
LLMを使用して項目を分類する。リトライとスキーマ検証機能を持つ。
"""
prompt = build_prompt_contract(item, company_ctx, period_ctx, rule_trace)
prompt_hash = _hash_prompt(prompt)
for attempt in range(max_retries + 1):
try:
# ① 決定的パラメータ(温度0 / top_p低)でLLMを呼び出し
response = llm_model.generate_content(
prompt,
generation_config=genai.types.GenerationConfig(
temperature=0.0,
top_p=0.1,
max_output_tokens=1024,
# レスポンス形式をJSONに指定 (gemini-1.5-pro-latestで有効)
response_mime_type="application/json",
)
)
resp_text = response.text
# ② JSONパース&スキーマ検証
obj = json.loads(resp_text)
validate(instance=obj, schema=LLM_CLASSIFY_SCHEMA)
# ③ バージョンが一致しているか確認
if obj.get("prompt_version") != PROMPT_VERSION:
# バージョンが異なる場合は強制的に上書きするか、エラーとして扱う
# ここでは警告を出しつつ、データをtraceに残す
print(f"Warning: LLM returned prompt_version '{obj.get('prompt_version')}', expected '{PROMPT_VERSION}'.")
obj["prompt_version"] = PROMPT_VERSION # 強制上書き
# ④ トレース用メタデータを付与
obj["_trace"] = {
"prompt_hash": prompt_hash,
"model_name": llm_model.model_name,
"attempt": attempt
}
return obj
except (json.JSONDecodeError, ValidationError, Exception) as e:
print(f"LLM classification attempt {attempt + 1} failed: {e}")
time.sleep(0.5) # 短い待機時間
continue
# ⑤ 全てのリトライが失敗した場合は“要確認”にフォールバック(再現性担保)
return {
"decision": "review",
"reason": f"LLM output failed schema validation after {max_retries + 1} attempts. Manual review required.",
"confidence": 0.0,
"prompt_version": PROMPT_VERSION,
"_trace": {
"prompt_hash": prompt_hash,
"model_name": llm_model.model_name,
"attempt": "exhausted"
}
}
- *温度0の徹底* LLMの創造性を封じ、出力のランダム性を排除。
- *厳格なJSONスキーマ(`schemas.py`)* LLMの出力形式を「契約書」として厳密に定義。「`decision`は`asset`, `expense`, `review`のいずれか」といった制約を課しています。
- *検証とフォールバック* LLMの出力がスキーマ(契約)に違反した場合、即座にエラーとみなし、数回リトライしてみます。それでも失敗する場合は、判定を諦めて人間に判断を促す「`review`」というわからない状態への出口も必要です。
- *プロンプトのバージョン管理* プロンプト自体も`prompt_hash`で管理し、いつ、どの「契約書」で判定したかを記録します。
これにより、「同じ入力に対しては、何度やっても同じ結果が返ってくる」という完全な再現性と、システムが暴走しない安全性を確保します。
ここまで、いかにして人の判断を堅牢な形で実装するかに思考が向いていたのですが、行き詰ったのはもっと単純な部分でした。
用意したデモのPDFを正しく読み取れなかったのです。
いかに堅牢なルールを用意しても、そもそも適切な日本語を渡せなければ無意味です。
誤ったデータからは誤ったデータしか生まれません。
3: ルール以前の話?OCRデータ精度の壁
後続の判定ロジックがいかに優れていても、入り口となるPDFの読み取り精度が低ければ、すべては「砂上の楼閣」です。ただOCR APIを呼ぶだけでは、実世界の多様で"汚い"PDFに対応できないことを痛感し、PDF解析部分を5つの層からなる堅牢なパイプラインへと再設計しました。
- *Layer 1: 入力の前処理 (`pdf_preprocess.py`)*
パイプラインの「門番」として、入力されたPDFが破損していないかチェックし、傾きを補正するなど、後続のOCRエンジンが最も性能を発揮できるクリーンな状態に整えるクレンジングの役割を果たします。
- *Layer 2: OCRエンジンとの対話 (`docai_wrapper.py`)*
外部のGoogle Document AIサービスと通信する「外交官」として、。APIキーの管理、リクエストのリトライ、タイムアウト処理など、外部サービスとの連携に伴う煩雑な処理をすべてこの層に集約します。
- *Layer 3: 構造の解析 (`document_analyzer.py`)*
OCRから返された単なる文字と座標情報の羅列に意味を与える「捜査官」として、テキストブロックをグルーピングし、段落やセクションを識別して、文書全体のレイアウトを解析します。
- *Layer 4: テーブルの再構築 (`table_reconstructor.py`)*
請求書の心臓部である「明細テーブル」を正確に復元する「分析者」として、行と列の関係を正確に把握し、品名、数量、単価、金額が正しく紐付いた、信頼性の高いテーブル構造を再構築します。
- *Layer 5: AIによるインテリジェントな補完 (`invoice_inferencer.py`)*
最終的なデータ品質を保証する「品質管理」として、これまでの層で構造化されたデータに対し、OCRで読み取れなかった品名などの欠損値がないかを確認。必要に応じてLLMへの問い合わせを行い、文脈に基づいた推論によってデータを補完・修正します。
...これにより、後続のビジネスロジックが利用するデータの完全性が飛躍的に向上するはずだったのですが、私自身の勉強不足もあってうまく理想を実現できていません。
アイデアベースでもよいので思うことがあった方はぜひコメントください!
3.5 現在の全体像
[入力: 見積/請求/台帳 行データ]
│ 正規化 (normalize_ja, token, n-gram, embed(任意))
▼
(1) Bundling 層 …… 役割付与とグルーピング
- 親(Parent)候補抽出(キーワード or 高額)
- 近傍ウィンドウ内で子をスコアリング
* キーワード/類似度/近接/金額比/ベンダ一致/埋め込み
- 競合解決 → bundle_id 付与
- ガード: 子が親の60%以上 → “solo”で扱う
└→ 出力: 各行 {bundle_id, role, _bundle_score, _bundle_trace}
▼(bundle結果を特徴量に反映)
(2) RuleEngine 層 …… 決定木で“確定/要入力/不決定/エラー”
- ルール/ノードはJSON外出し(policy-as-code)
- 安全AST評価(許可演算のみ)
- questionのauto_logicで非対話バッチ可
└→ 結果:
a) concluded: {decision, reason, trace}
b) needs_input: 入力欠如
c) undetermined: 条件非一致
d) error: 式評価等の例外
▼(aなら確定、b/c/d は補完候補)
(3) LLM 補完層 …… “不決定のみ”を厳格制約で照会
- 決定方針(日本の実務)をプロンプトに明記
- JSONスキーマ固定 + temperature=0 + top_p低
- スキーマ検証 & リトライ、バージョン/ハッシュ追跡
└→ 出力: {decision|review, reason, confidence, _trace}
▼
(4) 集約/書き出し層
- 行→バンドル→仕訳/注記に集約
- “incidental/maintenance”の扱いを親の決定に従属
- review はワークキューへ
- 監査ログ(全trace)保存、CSV/Excel/DB 反映
おわりに
G検定を取得して、実際に自分の役に立つものを形にしたかったので、ここまでの道のりは単にコードを書くこと以上の自分自身の挑戦です。AIに任せきるのではなく、不確実性をいかに制御し、人間の定めたルールと調和させ、実世界の業務に耐えうる堅牢なシステムを設計するかという、アーキテクチャ探求の難しさにぶつかっています。
今後は、ユーザーによる修正ログ(`corrections_log.csv`)をAIが自律的に学習し、語彙を自己拡張していく機能や、複数のOCRエンジン(PaddleOCR, Tesseract等)を並走させて精度を高める「マルチOCRパイプライン」の実装、判定後の各資産に関して、過去の教師データに基づく耐用年数・使用部門の自動分類など、利便性を高めるうえでまだまだできることはあると痛感しています。
まだまだアイデアベースで実装には程遠いですが、この開発日誌が、これから同様の課題に挑むエンジニアや経理マンの方々にとって、少しでも参考になれば幸いです。
《追記》
もとになる記事。皆様リアクションありがとうございました。