Anthropic
MockAnthropic provides a local mock server for simulating Anthropic API endpoints. It simplifies testing by allowing you to define request expectations and responses without making real network calls.
Quick Start
Add Dependency Include the library in your test dependencies (Maven or Gradle).
1implementation("me.kpavlov.aimocks:ai-mocks-anthropic-jvm:$latestVersion")
1<dependency> 2 <groupId>me.kpavlov.aimocks</groupId> 3 <artifactId>ai-mocks-anthropic-jvm</artifactId> 4 <version>[LATEST_VERSION]</version> 5</dependency>
Initialize the Server
1val anthropic = MockAnthropic(verbose = true)
- The server will start on a random free port by default.
- You can retrieve the server's base URL via
anthropic.baseUrl()
.
Configure Requests and Responses
Here's an example that sets up a mock "messages" endpoint and defines the response:
1anthropic.messages { 2 temperature = 0.42 3 model = "claude-3-7-sonnet-latest" 4 maxTokens = 100 5 topP = 0.95 6 topK = 40 7 userId = "user123" 8 systemMessageContains("helpful assistant") 9 userMessageContains("say 'Hello!'") 10} responds { 11 messageId = "msg_1234567890" 12 assistantContent = "Hello" // response content 13 delay = 200.milliseconds // simulate delay 14 stopReason = "end_turn" // reason for stopping 15}
- The
messages { ... }
block sets how the incoming request must look. - The
responds { ... }
block defines what the mock server returns.
- The
Calling Anthropic API Client
Here's an example that sets up and call official Anthropic SDK client:
1// create Anthropic SDK client 2val client = 3 AnthropicOkHttpClient 4 .builder() 5 .apiKey("my-anthropic-api-key") 6 .baseUrl(anthropic.baseUrl()) 7 .build() 8 9// prepare Anthropic SDK call 10val params = 11 MessageCreateParams 12 .builder() 13 .temperature(0.42) 14 .maxTokens(100) 15 .system("You are a helpful assistant.") 16 .addUserMessage("Just say 'Hello!' and nothing else") 17 .model("claude-3-7-sonnet-latest") 18 .build() 19 20val result = 21 client 22 .messages() 23 .create(params) 24 25result 26 .content() 27 .first() 28 .asText() 29 .text() shouldBe "Hello" // kotest matcher
Streaming Responses
You can also configure streaming responses (such as chunked SSE events) for testing:
1anthropic.messages {
2 temperature = 0.7
3 model = "claude-3-7-sonnet-latest"
4 maxTokens = 150
5 topP = 0.95
6 topK = 40
7 userId = "user123"
8 systemMessageContains("person from 60s")
9 userMessageContains("What do we need?")
10} respondsStream {
11 responseChunks = listOf("All", " we", " need", " is", " Love")
12 delay = 50.milliseconds
13 delayBetweenChunks = 10.milliseconds
14 stopReason = "end_turn"
15}
Or, you can use a flow to generate the response:
1anthropic.messages("anthropic-messages-flow") {
2 temperature = 0.7
3 model = "claude-3-7-sonnet-latest"
4 maxTokens = 150
5 topP = 0.95
6 topK = 40
7 userId = "user123"
8 systemMessageContains("person from 60s")
9 userMessageContains("What do we need?")
10} respondsStream {
11 responseFlow =
12 flow {
13 emit("All")
14 emit(" we")
15 emit(" need")
16 emit(" is")
17 emit(" Love")
18 }
19 delay = 60.milliseconds
20 delayBetweenChunks = 15.milliseconds
21 stopReason = "end_turn"
22}
Call Anthropic client:
1val params =
2 MessageCreateParams
3 .builder()
4 .temperature(0.7)
5 .maxTokens(150)
6 .topP(0.95)
7 .topK(40)
8 .metadata(Metadata.builder().userId("user123").build())
9 .system("You are a person from 60s")
10 .addUserMessage("What do we need?")
11 .model("claude-3-7-sonnet-latest")
12 .build()
13
14val timedValue =
15 measureTimedValue {
16 client
17 .messages()
18 .createStreaming(params)
19 .stream() // streaming
20 .consumeAsFlow()
21 .onStart { logger.info { "Started streaming" } }
22 .onEach {
23 logger
24 .info { it }
25 }.onCompletion { logger.info { "Completed streaming" } }
26 .count()
27 }
28timedValue.duration shouldBeLessThan 10.seconds
29timedValue.value shouldBeLessThan 10
Use your Anthropic client to invoke the endpoint at anthropic.baseUrl()
, and it will receive a streamed response.
Error Simulation
To test client behavior for exceptional cases:
1anthropic.messages {
2 // expected request
3} respondsError {
4 httpStatus = HttpStatusCode.InternalServerError // Set an error status code
5 body = """{
6 "type": "error",
7 "error": {
8 "type": "api_error",
9 "message": "An unexpected error has occurred internal to Anthropic's systems."
10 }
11 }"""
12 // Optionally add a delay or other properties
13}
Practical Example in Tests
1@Test
2fun `test basic conversation`() {
3 // Arrange: mock the messages API
4 anthropic.messages {
5 userMessageContains("Hello")
6 } responds {
7 assistantContent = "Hi from mock!"
8 }
9
10 // Act: call the mocked endpoint in your test code
11 val result = yourAnthropicClient.sendMessage("Hello")
12
13 // Assert: verify the response
14 assertEquals("Hi from mock!", result.assistantMessage)
15}
Integration with LangChain4j
You may use also LangChain4J Kotlin Extensions:
1// Set up mock response
2anthropic.messages {
3 userMessageContains("Hello")
4} responds {
5 assistantContent = "Hello"
6 delay = 42.milliseconds
7}
8
9// Create the LangChain4j model
10val model: AnthropicChatModel =
11 AnthropicChatModel
12 .builder()
13 .apiKey("foo")
14 .baseUrl(anthropic.baseUrl() + "/v1")
15 .modelName("claude-3-5-haiku-20241022")
16 .build()
17
18// Make the request using Kotlin DSL
19val result =
20 model.chat {
21 messages += userMessage("Say Hello")
22 }
23
24// Verify the response
25result.apply {
26 finishReason() shouldBe FinishReason.STOP
27 tokenUsage() shouldNotBe null
28 aiMessage().text() shouldBe "Hello"
29}
Stream Responses
Mock streaming responses easily with flow support:
1// Example 1: Using responseChunks
2val userMessage = "What do we need?"
3anthropic.messages {
4 systemMessageContains("You are a person of 60s")
5 userMessageContains(userMessage)
6} respondsStream {
7 responseChunks = listOf("All", " we", " need", " is", " Love")
8}
9
10// Example 2: Using responseFlow
11val userMessage2 = "What is in the sea?"
12anthropic.messages {
13 systemMessageContains("You are a person of 60s")
14 userMessageContains(userMessage2)
15} respondsStream {
16 responseFlow =
17 flow {
18 emit("Yellow")
19 emit(" submarine")
20 }
21}
22
23// Create the streaming model
24val model: AnthropicStreamingChatModel =
25 AnthropicStreamingChatModel
26 .builder()
27 .apiKey("foo")
28 .baseUrl(anthropic.baseUrl() + "/v1")
29 .modelName("claude-3-5-haiku-20241022")
30 .build()
31
32// Method 1: Using Kotlin Flow API
33model
34 .chatFlow {
35 messages += systemMessage("You are a person of 60s")
36 messages += userMessage(userMessage2)
37 }.buffer(capacity = 8096)
38 .collect {
39 when (it) {
40 is StreamingChatModelReply.PartialResponse -> {
41 println("token = ${it.partialResponse}")
42 }
43
44 is StreamingChatModelReply.CompleteResponse -> {
45 println("Completed: $it")
46 }
47
48 is StreamingChatModelReply.Error -> {
49 println("Error: $it")
50 }
51 }
52 }
53
54// Method 2: Using Java-style API with a handler
55model.chat(
56 ChatRequest
57 .builder()
58 .messages(
59 systemMessage("You are a person of 60s"),
60 userMessage(userMessage2)
61 )
62 .build(),
63 object : StreamingChatResponseHandler {
64 override fun onCompleteResponse(completeResponse: ChatResponse) {
65 println("Received CompleteResponse: $completeResponse")
66 }
67
68 override fun onPartialResponse(partialResponse: String) {
69 println("Received partial response: $partialResponse")
70 }
71
72 override fun onError(error: Throwable) {
73 println("Received error: $error")
74 }
75 }
76)
Stopping the Server
1anthropic.stop()
Stops the mock server and frees up resources.