DigitalBase Docs (β版)製品サイト

パイプライン(ETL)

データの取り込み・変換・出力をステップで組み合わせて自動化します。複雑な統合ツールではなく、シンプルな UI で AI とのデータ連携に特化しています。

仕組み

  • ステップを上から順に実行します。データは**行の集まり(list[dict])**として各ステップを流れます。
  • ステップは大きく 取得(source)→ 加工(transform)→ 出力(load / action) に分かれます。
  • 外部サービスの認証情報は 接続管理(ApiConnection) に保存し、ステップから参照します(他人の接続は管理者以外使えません)。
  • 値の差し込みにはテンプレートが使えます: {{ $json.列名 }} / {{ $today }} / {{ $workflow.name }} / {{ $env.変数名 }} / {{ $node["ステップ名"].json[0].列 }}

共通の設定(全ステップ)

各ステップには、失敗時の挙動を制御する共通オプションがあります。

項目既定説明
リトライoff失敗時に再試行する
最大リトライ回数3再試行の回数
リトライ間隔(ms)1000再試行までの待ち時間
エラー時の挙動stopstop(中断)/ continue(無視して継続)/ continue_with_error_output(エラー行を出力して継続)

オペレータ一覧

ファイル・API 連携

表示名キー役割
ファイル取得source_fileCSV / Excel / JSON / JSONL / PDF / TSV / DOCX などを読込(フォルダ・glob・接続 URI・仮想パス対応)
ファイル保存load_fileCSV / TSV / JSON / JSONL / MD / Excel / gzip で書き出し
ファイル差分取得source_file_landed前回成功以降に届いた新規ファイルだけを取得(差分トリガ)
RSS 取得source_rssRSS / Atom フィードを取得
REST API 取得source_restREST をページング・認証つきで取得
REST API 送信action_http_request各行ごとに HTTP リクエストを送信し、応答を行に付与
HTTP ファイル取得source_http_filesHTTP の一覧ページ / 単一 URL からファイル取得
Webhook 受信source_webhookWebhook 受信を有効化
Webhook 送信action_webhookWebhook URL へ署名・リトライつき送信
rsync 取得 / 送信source_rsync / load_rsyncリモートと rsync で同期
メール送信action_emailSMTP でメール送信

SQL・データ蓄積

表示名キー役割
SQL 取得 (SELECT)source_sqlSELECT / WITH のみ。クエリ直書き or テーブル+条件。差分取得対応
SQL 保存 (INSERT)load_sqlappend / replace / merge。テーブル自動作成・バッチ投入
SQL 更新 (UPDATE)update_sql主キーで行単位 UPDATE
SQL 実行 (任意)execute_sql任意の DDL / DML を実行
BigQuery 取得 / 書込source_bigquery / load_bigqueryBigQuery をクエリ / ストリーム挿入(SA JSON 認証)
ClickHouse 取得 / 書込source_clickhouse / load_clickhouseClickHouse の読み書き
DataLake に保存load_postgres_dataset内蔵 Data Lake に保存(SQL / BI や BI キーで参照)
RAG に保存load_to_rag行をまとめて 1 文書としてナレッジに埋め込み
RAG から検索source_rag_retrieveナレッジから類似チャンクを取得

データ加工

表示名キー役割
行の絞り込みtransform_filter条件で行を残す / 除く
列の編集transform_set列の設定 / 改名 / 削除 / 指定列だけ残す
列名の変更transform_rename_keys全行のキーを一括改名
並べ替えtransform_sort昇順 / 降順 / ランダム(重複除去・件数制限も)
件数制限transform_limit先頭 / 末尾 N 件
重複削除transform_dedup指定列で重複除去
行に分割transform_split1 行を複数行に展開
集計・グループ化transform_aggregateグループ化 + 集約関数
データ結合transform_mergeappend / inner / left / outer 結合
データ比較transform_compare2 つのデータの差分検出
型変換transform_castinteger / float / boolean / date / datetime / string
データ検証transform_validateルール検証(flag / reject)

ユーティリティ(AI・コード)

表示名キー役割
AI 変換transform_aimap / filter / reduce / classify を LLM で実行
画像を AI で読み取りtransform_image_describe画像を VLM(失敗時 OCR)で説明文に変換
Python 関数transform_codeサンドボックスで transform(data) を実行
シェルコマンドtransform_execute_commandコマンド実行し標準出力を行に付与
日時操作transform_datetimeフォーマット / 加減算 / 抽出 / 差分
暗号化・ハッシュtransform_cryptohash / base64 / URL encode / hmac
Markdown / HTMLtransform_markdownMarkdown ↔ HTML 変換
XML 処理transform_xmlXML ↔ dict 変換

条件分岐・繰り返し

表示名キー役割
条件分岐 (IF)flow_iftrue / false に振り分け
分岐 (Switch)flow_switch名前つき出力へ多分岐
ループflow_loopバッチ分割で逐次 / 並列処理
待機transform_wait指定秒スリープ
終了 (中断)flow_break条件成立でループを抜ける

業務システム連携(ERP / SFA)

表示名キー役割
Salesforce 取得 / 登録source_salesforce / load_salesforceSOQL 取得 / Composite API 書込(insert / upsert)
HubSpot 取得source_hubspotCRM オブジェクト取得
kintone 取得 / 登録source_kintone / load_kintoneバルク読込 / insert・upsert
freee 取得source_freeefreee 会計リソース
SmartHR 取得source_smarthr従業員 / 部署など
Sansan 取得source_sansan名刺 / 連絡先
Money Forward 取得source_moneyforwardクラウド会計リソース
Backlog 取得source_backlogissue / project / wiki / user

ログ・モニタリング

表示名キー役割
Elasticsearch 取得 / 書込source_elasticsearch / load_elasticsearchクエリ検索 / _bulk 書込
Zabbix 取得source_zabbix監視データ取得
ログ取得source_logsアプリ / 監査ログを取得
各種メトリクスsource_pipeline_metrics ほかパイプライン / チャット / ヘルプデスク / システムの統計

その他の連携(SaaS / 通知)

グループ / Google / 広告 / EC・決済 / 通知 / DevOps 向けに、次の取得・送信オペレータも利用できます(環境により表示が異なります)。

Garoon / Google Sheets(取得・書込)/ Gmail / Google Analytics 4 / Google Ads / Yahoo! 広告 / Shopify / Stripe / Slack 送信 / Teams 送信 / Chatwork 送信 / LINE WORKS 送信 / LINE 送信 / Discord 送信 / Telegram 送信 / Zoom 録画取得 / GitHub / GitLab / Jira(取得・Issue 作成)/ Notion(取得・書込)。

補足

S3 / GCS / SFTP / FTP / SMB / Box / Google Drive / OneDrive / Dropbox / Azure Blob / SharePoint などのファイル転送は、独立オペレータではなく ファイル取得 / 保存source_file / load_file)に接続 URI を指定して扱います。接続の作り方はファイル / 外部ストレージを参照してください。


主要オペレータのパラメータ

AI 変換(transform_ai

項目既定説明
操作mapmap(各行を変換)/ filter(残す行を判定)/ reduce(グループ集約)/ classify(分類)
モデル文脈の既定使う LLM
プロンプト / システムプロンプト指示文
出力列ai_output(classify は _label)結果を入れる列
分類カテゴリclassify のラベル候補。multi_label で複数付与
temperature ほか0.7(classify は 0.1)チャットと同じ生成パラメータ

Python 関数(transform_code

transform(data) を定義します。data は行の配列です。modeall(全行を一度に)/ each(1 行ずつ)。サンドボックスで実行され、ファイル・ネットワーク・危険な属性アクセスは禁止、timeout_seconds(既定 30)で打ち切られます。

ファイル取得 / 保存(source_file / load_file

取得は file_path(フォルダ末尾 /・glob・接続 URI・仮想パス myfile/shared/external/<label>)、format(未指定なら拡張子で判定)、encodingdelimitermax_rows など。保存は file_path(必須)、format(既定 csv)、appendcompress(.gz)。

SQL 取得 / 保存(source_sql / load_sql

取得は接続(connectionId)+ query(SELECT / WITH)または tablecolumns / where / order_by / limit、差分取得(incremental)。保存は table(必須)、write_disposition(append / replace / merge)、primary_key(merge 用)、batch_size(既定 1000)。

RAG に保存 / 検索(load_to_rag / source_rag_retrieve

保存は bot_id(必須)、mode(append / replace_all)、chunk_size(600)、chunk_overlap(100)。検索は bot_id(必須)、querytop_k(5)、similarity_threshold

条件の演算子(filter / IF / break 共通)

== != > >= < <= / contains not_contains starts_with ends_with / regex / exists not_exists / is_empty is_not_emptymatch で AND(all)/ OR(any)、case_sensitive の切り替え。

集約関数(transform_aggregate

count / sum / avg / min / max / count_unique / concat / first / last / collectgroup_by 空で全体集約。


代表的なパイプライン例

  1. 社内 DB をナレッジ化: SQL 取得 → AI 変換(要約) → RAG に保存
  2. ファイル取込 → 蓄積: ファイル取得(CSV glob) → 型変換 → データ検証 → DataLake に保存
  3. API → BigQuery: REST API 取得 → 絞り込み → 集計 → BigQuery 書込
  4. 差分監視 → 画像読取 → 通知: ファイル差分取得 → 画像を AI で読み取り → Slack 送信
  5. CRM 連携: kintone 取得 → データ比較(差分) → 条件分岐 → Salesforce 登録
  6. 監視 → 分類 → メール: メトリクス取得 → AI 変換(障害分類) → 絞り込み → メール送信

関連