Custom Data Sources ​
Guide to creating custom data source handlers.
Overview ​
Graphty's data source system is extensible. Create custom data sources to load graphs from APIs, databases, or specialized file formats.
DataSource Interface ​
All data sources extend the abstract DataSource class:
typescript
abstract class DataSource {
static type: string;
abstract load(config: object): AsyncGenerator<GraphData>;
}
interface GraphData {
nodes: NodeData[];
edges: EdgeData[];
}Creating a Custom Data Source ​
Basic Example ​
typescript
import { DataSource, GraphData, NodeData, EdgeData } from "@graphty/graphty-element";
class MyDataSource extends DataSource {
static type = "my-api";
async *load(config: { url: string }): AsyncGenerator<GraphData> {
const response = await fetch(config.url);
const data = await response.json();
yield {
nodes: data.nodes,
edges: data.edges,
};
}
}
// Register the data source
DataSource.register(MyDataSource);Using Your Data Source ​
typescript
await graph.loadFromDataSource("my-api", {
url: "https://api.example.com/graph",
});Complete Example: REST API ​
typescript
import { DataSource, GraphData, NodeData, EdgeData } from "@graphty/graphty-element";
interface ApiConfig {
baseUrl: string;
apiKey?: string;
graphId: string;
}
class RestApiDataSource extends DataSource {
static type = "rest-api";
async *load(config: ApiConfig): AsyncGenerator<GraphData> {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (config.apiKey) {
headers["Authorization"] = `Bearer ${config.apiKey}`;
}
// Fetch nodes
const nodesResponse = await fetch(`${config.baseUrl}/graphs/${config.graphId}/nodes`, { headers });
const nodesData = await nodesResponse.json();
// Fetch edges
const edgesResponse = await fetch(`${config.baseUrl}/graphs/${config.graphId}/edges`, { headers });
const edgesData = await edgesResponse.json();
// Transform to Graphty format
const nodes: NodeData[] = nodesData.map((n: any) => ({
id: n.nodeId,
label: n.name,
...n.attributes,
}));
const edges: EdgeData[] = edgesData.map((e: any) => ({
source: e.fromNode,
target: e.toNode,
weight: e.weight,
}));
yield { nodes, edges };
}
}
DataSource.register(RestApiDataSource);Usage:
typescript
await graph.loadFromDataSource("rest-api", {
baseUrl: "https://api.example.com",
apiKey: "your-api-key",
graphId: "graph-123",
});Chunked Loading ​
For large datasets, yield data in chunks:
typescript
class PaginatedDataSource extends DataSource {
static type = "paginated";
async *load(config: { url: string; pageSize: number }): AsyncGenerator<GraphData> {
let page = 0;
let hasMore = true;
while (hasMore) {
const response = await fetch(`${config.url}?page=${page}&size=${config.pageSize}`);
const data = await response.json();
yield {
nodes: data.nodes,
edges: data.edges,
};
hasMore = data.hasNextPage;
page++;
}
}
}
DataSource.register(PaginatedDataSource);This progressively loads the graph, with the UI updating after each chunk.
Streaming Data Source ​
For real-time data streams:
typescript
class WebSocketDataSource extends DataSource {
static type = "websocket";
async *load(config: { wsUrl: string }): AsyncGenerator<GraphData> {
const ws = new WebSocket(config.wsUrl);
// Create a queue for incoming data
const queue: GraphData[] = [];
let resolve: (() => void) | null = null;
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
queue.push({
nodes: data.nodes || [],
edges: data.edges || [],
});
if (resolve) {
resolve();
resolve = null;
}
};
// Wait for initial connection
await new Promise<void>((r) => {
ws.onopen = () => r();
});
// Yield data as it arrives
try {
while (ws.readyState === WebSocket.OPEN) {
if (queue.length > 0) {
yield queue.shift()!;
} else {
await new Promise<void>((r) => {
resolve = r;
});
}
}
} finally {
ws.close();
}
}
}
DataSource.register(WebSocketDataSource);Custom File Format ​
Parse a custom file format:
typescript
class CustomFormatDataSource extends DataSource {
static type = "custom-format";
async *load(config: { content: string }): AsyncGenerator<GraphData> {
const lines = config.content.split("\n");
const nodes: NodeData[] = [];
const edges: EdgeData[] = [];
let section = "none";
for (const line of lines) {
const trimmed = line.trim();
if (trimmed === "[NODES]") {
section = "nodes";
continue;
}
if (trimmed === "[EDGES]") {
section = "edges";
continue;
}
if (!trimmed || trimmed.startsWith("#")) {
continue;
}
if (section === "nodes") {
const [id, label] = trimmed.split(":");
nodes.push({ id, label });
}
if (section === "edges") {
const [source, target, weight] = trimmed.split(",");
edges.push({
source,
target,
weight: parseFloat(weight) || 1,
});
}
}
yield { nodes, edges };
}
}
DataSource.register(CustomFormatDataSource);Example custom format file:
# My Graph
[NODES]
a:Node A
b:Node B
c:Node C
[EDGES]
a,b,1.5
b,c,2.0
c,a,0.5Schema Validation ​
Use Zod for input validation:
typescript
import { z } from "zod";
import { DataSource, GraphData } from "@graphty/graphty-element";
const ConfigSchema = z.object({
url: z.string().url(),
timeout: z.number().optional().default(5000),
retries: z.number().optional().default(3),
});
type Config = z.infer<typeof ConfigSchema>;
class ValidatedDataSource extends DataSource {
static type = "validated";
async *load(config: unknown): AsyncGenerator<GraphData> {
// Validate config
const validConfig = ConfigSchema.parse(config);
// Use validated config
let lastError: Error | null = null;
for (let attempt = 0; attempt < validConfig.retries; attempt++) {
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), validConfig.timeout);
const response = await fetch(validConfig.url, {
signal: controller.signal,
});
clearTimeout(timeoutId);
const data = await response.json();
yield { nodes: data.nodes, edges: data.edges };
return;
} catch (error) {
lastError = error as Error;
}
}
throw lastError || new Error("Failed to load data");
}
}
DataSource.register(ValidatedDataSource);Error Handling ​
Handle errors gracefully:
typescript
class RobustDataSource extends DataSource {
static type = "robust";
async *load(config: { urls: string[] }): AsyncGenerator<GraphData> {
const allNodes: NodeData[] = [];
const allEdges: EdgeData[] = [];
const errors: Error[] = [];
for (const url of config.urls) {
try {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data = await response.json();
if (!Array.isArray(data.nodes) || !Array.isArray(data.edges)) {
throw new Error("Invalid data format");
}
allNodes.push(...data.nodes);
allEdges.push(...data.edges);
// Yield partial results
yield { nodes: data.nodes, edges: data.edges };
} catch (error) {
errors.push(error as Error);
console.warn(`Failed to load ${url}:`, error);
}
}
if (allNodes.length === 0 && errors.length > 0) {
throw new AggregateError(errors, "All data sources failed");
}
}
}
DataSource.register(RobustDataSource);Configuration Types ​
Export your config type for TypeScript users:
typescript
// my-data-source.ts
export interface MyDataSourceConfig {
url: string;
apiKey?: string;
options?: {
includeMetadata?: boolean;
maxNodes?: number;
};
}
class MyDataSource extends DataSource {
static type = "my-source";
async *load(config: MyDataSourceConfig): AsyncGenerator<GraphData> {
// ...
}
}Usage with type checking:
typescript
import type { MyDataSourceConfig } from "./my-data-source";
const config: MyDataSourceConfig = {
url: "https://api.example.com/graph",
options: { maxNodes: 1000 },
};
await graph.loadFromDataSource("my-source", config);