blob: b70b7b613313e57f2faef711c1dc7c2f7c32f82d [file] [log] [blame]
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'package:async/async.dart' hide Result;
import 'package:meta/meta.dart';
import 'package:stream_channel/stream_channel.dart';
import '../api/api.dart';
import '../shared.dart';
part 'elicitation_support.dart';
part 'roots_support.dart';
part 'sampling_support.dart';
/// The base class for MCP clients.
///
/// Can be directly constructed or extended with additional classes.
///
/// Adding [capabilities] is done through additional support mixins such as
/// [RootsSupport].
///
/// Override the [initialize] function to perform setup logic inside mixins,
/// this will be invoked at the end of base class constructor.
base class MCPClient {
/// A description of the client sent to servers during initialization.
final Implementation implementation;
MCPClient(this.implementation) {
initialize();
}
/// Lifecycle method called in the base class constructor.
///
/// Used to modify the [capabilities] of this client from mixins, or perform
/// any other initialization that is required.
void initialize() {}
/// The capabilities of this client.
///
/// This can be modified by overriding the [initialize] method.
final ClientCapabilities capabilities = ClientCapabilities();
@visibleForTesting
final Set<ServerConnection> connections = {};
/// Connect to a new MCP server over [stdin] and [stdout].
///
/// If [protocolLogSink] is provided, all messages sent between the client and
/// server will be forwarded to that [Sink] as well, with `<<<` preceding
/// incoming messages and `>>>` preceding outgoing messages. It is the
/// responsibility of the caller to close this sink.
///
/// If [onDone] is passed, it will be invoked when the connection shuts down.
ServerConnection connectStdioServer(
StreamSink<List<int>> stdin,
Stream<List<int>> stdout, {
Sink<String>? protocolLogSink,
void Function()? onDone,
}) {
final channel = StreamChannel.withCloseGuarantee(stdout, stdin)
.transform(StreamChannelTransformer.fromCodec(utf8))
.transformStream(const LineSplitter())
.transformSink(
StreamSinkTransformer.fromHandlers(
handleData: (data, sink) {
sink.add('$data\n');
},
),
);
final connection = connectServer(channel, protocolLogSink: protocolLogSink);
if (onDone != null) connection.done.then((_) => onDone());
return connection;
}
/// Returns a connection for an MCP server using a [channel], which is already
/// established.
///
/// Each [String] sent over [channel] represents an entire JSON request or
/// response.
///
/// If [protocolLogSink] is provided, all messages sent on [channel] will be
/// forwarded to that [Sink] as well, with `<<<` preceding incoming messages
/// and `>>>` preceding outgoing messages. It is the responsibility of the
/// caller to close this sink.
ServerConnection connectServer(
StreamChannel<String> channel, {
Sink<String>? protocolLogSink,
}) {
// For type promotion in this function.
final self = this;
final connection = ServerConnection.fromStreamChannel(
channel,
protocolLogSink: protocolLogSink,
rootsSupport: self is RootsSupport ? self : null,
samplingSupport: self is SamplingSupport ? self : null,
elicitationSupport: self is ElicitationSupport ? self : null,
);
connections.add(connection);
channel.sink.done.then((_) => connections.remove(connection));
return connection;
}
/// Shuts down all active server connections.
Future<void> shutdown() async {
final connections = this.connections.toList();
this.connections.clear();
await Future.wait([
for (var connection in connections) connection.shutdown(),
]);
}
}
/// An active server connection.
base class ServerConnection extends MCPBase {
/// The version of the protocol that was negotiated during initialization.
///
/// Some APIs may error if you attempt to use them without first checking the
/// protocol version.
late ProtocolVersion protocolVersion;
/// The [Implementation] returned from the [initialize] request.
///
/// Only non-null after [initialize] has successfully completed.
Implementation? serverInfo;
/// The [ServerCapabilities] returned from the [initialize] request.
///
/// Only assigned after [initialize] has successfully completed.
late ServerCapabilities serverCapabilities;
@override
String get name => serverInfo?.name ?? super.name;
/// Emits an event any time the server notifies us of a change to the list of
/// prompts it supports.
///
/// This is a broadcast stream, events are not buffered and only future events
/// are given.
Stream<PromptListChangedNotification> get promptListChanged =>
_promptListChangedController.stream;
final _promptListChangedController =
StreamController<PromptListChangedNotification>.broadcast();
/// Emits an event any time the server notifies us of a change to the list of
/// tools it supports.
///
/// This is a broadcast stream, events are not buffered and only future events
/// are given.
Stream<ToolListChangedNotification> get toolListChanged =>
_toolListChangedController.stream;
final _toolListChangedController =
StreamController<ToolListChangedNotification>.broadcast();
/// Emits an event any time the server notifies us of a change to the list of
/// resources it supports.
///
/// This is a broadcast stream, events are not buffered and only future events
/// are given.
Stream<ResourceListChangedNotification> get resourceListChanged =>
_resourceListChangedController.stream;
final _resourceListChangedController =
StreamController<ResourceListChangedNotification>.broadcast();
/// Emits an event any time the server notifies us of a change to a resource
/// that this client has subscribed to.
///
/// This is a broadcast stream, events are not buffered and only future events
/// are given.
Stream<ResourceUpdatedNotification> get resourceUpdated =>
_resourceUpdatedController.stream;
final _resourceUpdatedController =
StreamController<ResourceUpdatedNotification>.broadcast();
/// Emits an event any time the server sends a log message.
///
/// This is a broadcast stream, events are not buffered and only future events
/// are given.
Stream<LoggingMessageNotification> get onLog => _logController.stream;
final _logController =
StreamController<LoggingMessageNotification>.broadcast();
/// A 1:1 connection from a client to a server using [channel].
///
/// If the client supports "roots", then it should provide an implementation
/// through [rootsSupport].
///
/// If the client supports "sampling", then it should provide an
/// implementation through [samplingSupport].
ServerConnection.fromStreamChannel(
super.channel, {
super.protocolLogSink,
RootsSupport? rootsSupport,
SamplingSupport? samplingSupport,
ElicitationSupport? elicitationSupport,
}) {
if (rootsSupport != null) {
registerRequestHandler(
ListRootsRequest.methodName,
rootsSupport.handleListRoots,
);
}
if (samplingSupport != null) {
registerRequestHandler(
CreateMessageRequest.methodName,
(CreateMessageRequest request) =>
samplingSupport.handleCreateMessage(request, serverInfo!),
);
}
if (elicitationSupport != null) {
registerRequestHandler(ElicitRequest.methodName, (ElicitRequest request) {
return elicitationSupport.handleElicitation(request);
});
}
registerNotificationHandler(
PromptListChangedNotification.methodName,
_promptListChangedController.sink.add,
);
registerNotificationHandler(
ToolListChangedNotification.methodName,
_toolListChangedController.sink.add,
);
registerNotificationHandler(
ResourceListChangedNotification.methodName,
_resourceListChangedController.sink.add,
);
registerNotificationHandler(
ResourceUpdatedNotification.methodName,
_resourceUpdatedController.sink.add,
);
registerNotificationHandler(
LoggingMessageNotification.methodName,
_logController.sink.add,
);
}
/// Close all connections and streams so the process can cleanly exit.
@override
Future<void> shutdown() async {
await Future.wait([
super.shutdown(),
_promptListChangedController.close(),
_toolListChangedController.close(),
_resourceListChangedController.close(),
_resourceUpdatedController.close(),
_logController.close(),
]);
}
/// Called after a successful call to [initialize].
void notifyInitialized([InitializedNotification? notification]) =>
sendNotification(InitializedNotification.methodName, notification);
/// Initializes the server, this should be done before anything else.
///
/// The client must call [notifyInitialized] after receiving and accepting
/// this response.
///
/// Throws a [StateError] if initialization fails for unknown reasons (usually
/// the server connection closes prematurely due to misconfiguration). To
/// debug these errors you should pass a `protocolLogSink` when creating these
/// connections.
Future<InitializeResult> initialize(InitializeRequest request) async {
final response = await sendRequest<InitializeResult>(
InitializeRequest.methodName,
request,
);
serverInfo = response.serverInfo;
serverCapabilities = response.capabilities;
final serverVersion = response.protocolVersion;
if (serverVersion == null || !serverVersion.isSupported) {
await shutdown();
} else {
protocolVersion = serverVersion;
}
return response;
}
/// List all the tools from this server.
Future<ListToolsResult> listTools([ListToolsRequest? request]) =>
sendRequest(ListToolsRequest.methodName, request);
/// Invokes a [Tool] returned from the [ListToolsResult].
Future<CallToolResult> callTool(CallToolRequest request) =>
sendRequest(CallToolRequest.methodName, request);
/// Lists all the [Resource]s from this server.
Future<ListResourcesResult> listResources(ListResourcesRequest request) =>
sendRequest(ListResourcesRequest.methodName, request);
/// Reads a [Resource] returned from the [ListResourcesResult] or matching
/// a [ResourceTemplate] from a [ListResourceTemplatesResult].
Future<ReadResourceResult> readResource(ReadResourceRequest request) =>
sendRequest(ReadResourceRequest.methodName, request);
/// Lists all the [ResourceTemplate]s from this server.
Future<ListResourceTemplatesResult> listResourceTemplates(
ListResourceTemplatesRequest request,
) => sendRequest(ListResourceTemplatesRequest.methodName, request);
/// Lists all the prompts from this server.
Future<ListPromptsResult> listPrompts(ListPromptsRequest request) =>
sendRequest(ListPromptsRequest.methodName, request);
/// Gets the requested [Prompt] from the server.
Future<GetPromptResult> getPrompt(GetPromptRequest request) =>
sendRequest(GetPromptRequest.methodName, request);
/// Subscribes this client to a resource by URI (at `request.uri`).
///
/// Updates will come on the [resourceUpdated] stream.
Future<void> subscribeResource(SubscribeRequest request) =>
sendRequest(SubscribeRequest.methodName, request);
/// Unsubscribes this client to a resource by URI (at `request.uri`).
///
/// Updates will come on the [resourceUpdated] stream.
Future<void> unsubscribeResource(UnsubscribeRequest request) =>
sendRequest(UnsubscribeRequest.methodName, request);
/// Sends a request to change the current logging level.
///
/// Completes when the response is received.
Future<void> setLogLevel(SetLevelRequest request) =>
sendRequest(SetLevelRequest.methodName, request);
/// Sends a request to get completions from the server.
///
/// Clients should debounce their calls to this API to avoid overloading the
/// server.
///
/// You should check the [protocolVersion] before using this API, it must be
/// >= [ProtocolVersion.v2025_03_26].
// TODO: Implement automatic debouncing.
Future<CompleteResult> requestCompletions(CompleteRequest request) =>
sendRequest(CompleteRequest.methodName, request);
}