パイプライン(ETL)
データの取り込み・変換・出力をステップで組み合わせて自動化します。複雑な統合ツールではなく、シンプルな UI で AI とのデータ連携に特化しています。
仕組み
- ステップを上から順に実行します。データは**行の集まり(list[dict])**として各ステップを流れます。
- ステップは大きく 取得(source)→ 加工(transform)→ 出力(load / action) に分かれます。
- 外部サービスの認証情報は 接続管理(ApiConnection) に保存し、ステップから参照します(他人の接続は管理者以外使えません)。
- 値の差し込みにはテンプレートが使えます:
{{ $json.列名 }}/{{ $today }}/{{ $workflow.name }}/{{ $env.変数名 }}/{{ $node["ステップ名"].json[0].列 }}。
共通の設定(全ステップ)
各ステップには、失敗時の挙動を制御する共通オプションがあります。
| 項目 | 既定 | 説明 |
|---|---|---|
| リトライ | off | 失敗時に再試行する |
| 最大リトライ回数 | 3 | 再試行の回数 |
| リトライ間隔(ms) | 1000 | 再試行までの待ち時間 |
| エラー時の挙動 | stop | stop(中断)/ continue(無視して継続)/ continue_with_error_output(エラー行を出力して継続) |
オペレータ一覧
ファイル・API 連携
| 表示名 | キー | 役割 |
|---|---|---|
| ファイル取得 | source_file | CSV / Excel / JSON / JSONL / PDF / TSV / DOCX などを読込(フォルダ・glob・接続 URI・仮想パス対応) |
| ファイル保存 | load_file | CSV / TSV / JSON / JSONL / MD / Excel / gzip で書き出し |
| ファイル差分取得 | source_file_landed | 前回成功以降に届いた新規ファイルだけを取得(差分トリガ) |
| RSS 取得 | source_rss | RSS / Atom フィードを取得 |
| REST API 取得 | source_rest | REST をページング・認証つきで取得 |
| REST API 送信 | action_http_request | 各行ごとに HTTP リクエストを送信し、応答を行に付与 |
| HTTP ファイル取得 | source_http_files | HTTP の一覧ページ / 単一 URL からファイル取得 |
| Webhook 受信 | source_webhook | Webhook 受信を有効化 |
| Webhook 送信 | action_webhook | Webhook URL へ署名・リトライつき送信 |
| rsync 取得 / 送信 | source_rsync / load_rsync | リモートと rsync で同期 |
| メール送信 | action_email | SMTP でメール送信 |
SQL・データ蓄積
| 表示名 | キー | 役割 |
|---|---|---|
| SQL 取得 (SELECT) | source_sql | SELECT / WITH のみ。クエリ直書き or テーブル+条件。差分取得対応 |
| SQL 保存 (INSERT) | load_sql | append / replace / merge。テーブル自動作成・バッチ投入 |
| SQL 更新 (UPDATE) | update_sql | 主キーで行単位 UPDATE |
| SQL 実行 (任意) | execute_sql | 任意の DDL / DML を実行 |
| BigQuery 取得 / 書込 | source_bigquery / load_bigquery | BigQuery をクエリ / ストリーム挿入(SA JSON 認証) |
| ClickHouse 取得 / 書込 | source_clickhouse / load_clickhouse | ClickHouse の読み書き |
| DataLake に保存 | load_postgres_dataset | 内蔵 Data Lake に保存(SQL / BI や BI キーで参照) |
| RAG に保存 | load_to_rag | 行をまとめて 1 文書としてナレッジに埋め込み |
| RAG から検索 | source_rag_retrieve | ナレッジから類似チャンクを取得 |
データ加工
| 表示名 | キー | 役割 |
|---|---|---|
| 行の絞り込み | transform_filter | 条件で行を残す / 除く |
| 列の編集 | transform_set | 列の設定 / 改名 / 削除 / 指定列だけ残す |
| 列名の変更 | transform_rename_keys | 全行のキーを一括改名 |
| 並べ替え | transform_sort | 昇順 / 降順 / ランダム(重複除去・件数制限も) |
| 件数制限 | transform_limit | 先頭 / 末尾 N 件 |
| 重複削除 | transform_dedup | 指定列で重複除去 |
| 行に分割 | transform_split | 1 行を複数行に展開 |
| 集計・グループ化 | transform_aggregate | グループ化 + 集約関数 |
| データ結合 | transform_merge | append / inner / left / outer 結合 |
| データ比較 | transform_compare | 2 つのデータの差分検出 |
| 型変換 | transform_cast | integer / float / boolean / date / datetime / string |
| データ検証 | transform_validate | ルール検証(flag / reject) |
ユーティリティ(AI・コード)
| 表示名 | キー | 役割 |
|---|---|---|
| AI 変換 | transform_ai | map / filter / reduce / classify を LLM で実行 |
| 画像を AI で読み取り | transform_image_describe | 画像を VLM(失敗時 OCR)で説明文に変換 |
| Python 関数 | transform_code | サンドボックスで transform(data) を実行 |
| シェルコマンド | transform_execute_command | コマンド実行し標準出力を行に付与 |
| 日時操作 | transform_datetime | フォーマット / 加減算 / 抽出 / 差分 |
| 暗号化・ハッシュ | transform_crypto | hash / base64 / URL encode / hmac |
| Markdown / HTML | transform_markdown | Markdown ↔ HTML 変換 |
| XML 処理 | transform_xml | XML ↔ dict 変換 |
条件分岐・繰り返し
| 表示名 | キー | 役割 |
|---|---|---|
| 条件分岐 (IF) | flow_if | true / false に振り分け |
| 分岐 (Switch) | flow_switch | 名前つき出力へ多分岐 |
| ループ | flow_loop | バッチ分割で逐次 / 並列処理 |
| 待機 | transform_wait | 指定秒スリープ |
| 終了 (中断) | flow_break | 条件成立でループを抜ける |
業務システム連携(ERP / SFA)
| 表示名 | キー | 役割 |
|---|---|---|
| Salesforce 取得 / 登録 | source_salesforce / load_salesforce | SOQL 取得 / Composite API 書込(insert / upsert) |
| HubSpot 取得 | source_hubspot | CRM オブジェクト取得 |
| kintone 取得 / 登録 | source_kintone / load_kintone | バルク読込 / insert・upsert |
| freee 取得 | source_freee | freee 会計リソース |
| SmartHR 取得 | source_smarthr | 従業員 / 部署など |
| Sansan 取得 | source_sansan | 名刺 / 連絡先 |
| Money Forward 取得 | source_moneyforward | クラウド会計リソース |
| Backlog 取得 | source_backlog | issue / project / wiki / user |
ログ・モニタリング
| 表示名 | キー | 役割 |
|---|---|---|
| Elasticsearch 取得 / 書込 | source_elasticsearch / load_elasticsearch | クエリ検索 / _bulk 書込 |
| Zabbix 取得 | source_zabbix | 監視データ取得 |
| ログ取得 | source_logs | アプリ / 監査ログを取得 |
| 各種メトリクス | source_pipeline_metrics ほか | パイプライン / チャット / ヘルプデスク / システムの統計 |
その他の連携(SaaS / 通知)
グループ / Google / 広告 / EC・決済 / 通知 / DevOps 向けに、次の取得・送信オペレータも利用できます(環境により表示が異なります)。
Garoon / Google Sheets(取得・書込)/ Gmail / Google Analytics 4 / Google Ads / Yahoo! 広告 / Shopify / Stripe / Slack 送信 / Teams 送信 / Chatwork 送信 / LINE WORKS 送信 / LINE 送信 / Discord 送信 / Telegram 送信 / Zoom 録画取得 / GitHub / GitLab / Jira(取得・Issue 作成)/ Notion(取得・書込)。
S3 / GCS / SFTP / FTP / SMB / Box / Google Drive / OneDrive / Dropbox / Azure Blob / SharePoint などのファイル転送は、独立オペレータではなく ファイル取得 / 保存(source_file / load_file)に接続 URI を指定して扱います。接続の作り方はファイル / 外部ストレージを参照してください。
主要オペレータのパラメータ
AI 変換(transform_ai)
| 項目 | 既定 | 説明 |
|---|---|---|
| 操作 | map | map(各行を変換)/ filter(残す行を判定)/ reduce(グループ集約)/ classify(分類) |
| モデル | 文脈の既定 | 使う LLM |
| プロンプト / システムプロンプト | — | 指示文 |
| 出力列 | ai_output(classify は _label) | 結果を入れる列 |
| 分類カテゴリ | — | classify のラベル候補。multi_label で複数付与 |
| temperature ほか | 0.7(classify は 0.1) | チャットと同じ生成パラメータ |
Python 関数(transform_code)
transform(data) を定義します。data は行の配列です。mode は all(全行を一度に)/ each(1 行ずつ)。サンドボックスで実行され、ファイル・ネットワーク・危険な属性アクセスは禁止、timeout_seconds(既定 30)で打ち切られます。
ファイル取得 / 保存(source_file / load_file)
取得は file_path(フォルダ末尾 /・glob・接続 URI・仮想パス myfile/・shared/・external/<label>)、format(未指定なら拡張子で判定)、encoding、delimiter、max_rows など。保存は file_path(必須)、format(既定 csv)、append、compress(.gz)。
SQL 取得 / 保存(source_sql / load_sql)
取得は接続(connectionId)+ query(SELECT / WITH)または table + columns / where / order_by / limit、差分取得(incremental)。保存は table(必須)、write_disposition(append / replace / merge)、primary_key(merge 用)、batch_size(既定 1000)。
RAG に保存 / 検索(load_to_rag / source_rag_retrieve)
保存は bot_id(必須)、mode(append / replace_all)、chunk_size(600)、chunk_overlap(100)。検索は bot_id(必須)、query、top_k(5)、similarity_threshold。
条件の演算子(filter / IF / break 共通)
== != > >= < <= / contains not_contains starts_with ends_with / regex / exists not_exists / is_empty is_not_empty。match で AND(all)/ OR(any)、case_sensitive の切り替え。
集約関数(transform_aggregate)
count / sum / avg / min / max / count_unique / concat / first / last / collect。group_by 空で全体集約。
代表的なパイプライン例
- 社内 DB をナレッジ化: SQL 取得 → AI 変換(要約) → RAG に保存
- ファイル取込 → 蓄積: ファイル取得(CSV glob) → 型変換 → データ検証 → DataLake に保存
- API → BigQuery: REST API 取得 → 絞り込み → 集計 → BigQuery 書込
- 差分監視 → 画像読取 → 通知: ファイル差分取得 → 画像を AI で読み取り → Slack 送信
- CRM 連携: kintone 取得 → データ比較(差分) → 条件分岐 → Salesforce 登録
- 監視 → 分類 → メール: メトリクス取得 → AI 変換(障害分類) → 絞り込み → メール送信
関連
- ファイル / 外部ストレージ — 接続 URI とファイルの扱い
- SQL / BI — Data Lake と BigQuery
- ナレッジ(RAG)