Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 129 additions & 75 deletions packages/core/src/tracing/openai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { getClient } from '../../currentScopes';
import { captureException } from '../../exports';
import { SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../../semanticAttributes';
import { SPAN_STATUS_ERROR } from '../../tracing';
import { startSpan, startSpanManual } from '../../tracing/trace';
import { startInactiveSpan, startSpanManual } from '../../tracing/trace';
import type { Span, SpanAttributeValue } from '../../types-hoist/span';
import { isThenable } from '../../utils/is';
import {
GEN_AI_OPERATION_NAME_ATTRIBUTE,
GEN_AI_REQUEST_AVAILABLE_TOOLS_ATTRIBUTE,
Expand Down Expand Up @@ -126,97 +127,150 @@ function addRequestAttributes(span: Span, params: Record<string, unknown>): void
}
}

/**
* Handle common error catching and reporting for streaming requests
*/
function handleStreamingError(error: unknown, span: Span, methodPath: string): never {
captureException(error, {
mechanism: { handled: false, type: 'auto.ai.openai.stream', data: { function: methodPath } },
});

if (span.isRecording()) {
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
span.end();
}
throw error;
}

/**
* Instrument a method with Sentry spans
* Following Sentry AI Agents Manual Instrumentation conventions
* @see https://docs.sentry.io/platforms/javascript/guides/node/tracing/instrumentation/ai-agents-module/#manual-instrumentation
*
* This implementation uses Proxy and startInactiveSpan to preserve the original
* return type (e.g., OpenAI's APIPromise with .withResponse() method).
*/
function instrumentMethod<T extends unknown[], R>(
originalMethod: (...args: T) => Promise<R>,
originalMethod: (...args: T) => R | Promise<R>,
methodPath: InstrumentedMethod,
context: unknown,
options: OpenAiOptions,
): (...args: T) => Promise<R> {
return async function instrumentedMethod(...args: T): Promise<R> {
const requestAttributes = extractRequestAttributes(args, methodPath);
const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown';
const operationName = getOperationName(methodPath);
): (...args: T) => R | Promise<R> {
return new Proxy(originalMethod, {
apply(target, _thisArg, args: T): R | Promise<R> {
const requestAttributes = extractRequestAttributes(args, methodPath);
const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown';
const operationName = getOperationName(methodPath);

const params = args[0] as Record<string, unknown> | undefined;
const isStreamRequested = params && typeof params === 'object' && params.stream === true;

const params = args[0] as Record<string, unknown> | undefined;
const isStreamRequested = params && typeof params === 'object' && params.stream === true;
if (isStreamRequested) {
// For streaming responses, use manual span management to properly handle the async generator lifecycle
return startSpanManual(
{
name: `${operationName} ${model} stream-response`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async (span: Span) => {
try {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
}

if (isStreamRequested) {
// For streaming responses, use manual span management to properly handle the async generator lifecycle
return startSpanManual(
{
name: `${operationName} ${model} stream-response`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async (span: Span) => {
try {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
const result = await target.apply(context, args);

return instrumentStream(
result as OpenAIStream<ChatCompletionChunk | ResponseStreamingEvent>,
span,
options.recordOutputs ?? false,
) as unknown as R;
} catch (error) {
return handleStreamingError(error, span, methodPath);
}
},
);
}

const result = await originalMethod.apply(context, args);
// Non-streaming responses: use startInactiveSpan to preserve original return type
// (e.g., OpenAI's APIPromise with .withResponse())
//
// We use startInactiveSpan instead of startSpan/startSpanManual because those
// internally use handleCallbackErrors which calls .then() on Promises, creating
// a new Promise instance and losing APIPromise's custom methods like .withResponse().
const span = startInactiveSpan({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tried using start span here?
I understand that your purpose here is to use handleCallbackErrors and handle promises, you can still achieve this with start span, and as long as we didn't end the span in a different scope/context, it should work fine.

name: `${operationName} ${model}`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
});

return instrumentStream(
result as OpenAIStream<ChatCompletionChunk | ResponseStreamingEvent>,
span,
options.recordOutputs ?? false,
) as unknown as R;
} catch (error) {
// For streaming requests that fail before stream creation, we still want to record
// them as streaming requests but end the span gracefully
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
captureException(error, {
mechanism: {
handled: false,
type: 'auto.ai.openai.stream',
data: {
function: methodPath,
// Handle synchronous exceptions from the API call or request attribute processing
let result: R | Promise<R>;
try {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
}
result = target.apply(context, args);
} catch (err) {
captureException(err, {
mechanism: {
handled: false,
type: 'auto.ai.openai',
data: {
function: methodPath,
},
},
});
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
span.end();
throw err;
}

// Attach side-effect handlers without transforming the Promise
// This preserves the original APIPromise type and its methods like .withResponse()
if (isThenable(result)) {
Promise.resolve(result)
.then(
res => {
try {
addResponseAttributes(span, res as OpenAiResponse, options.recordOutputs);
} catch {
// Ignore attribute processing errors - they shouldn't affect the original Promise
// The span will still be ended in finally()
}
},
err => {
captureException(err, {
mechanism: {
handled: false,
type: 'auto.ai.openai',
data: {
function: methodPath,
},
},
},
});
});
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
},
)
.finally(() => {
span.end();
throw error;
}
},
);
} else {
// Non-streaming responses
return startSpan(
{
name: `${operationName} ${model}`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async (span: Span) => {
try {
if (options.recordInputs && params) {
addRequestAttributes(span, params);
}
});
} else {
// Synchronous result (unlikely for OpenAI API but handle it)
try {
addResponseAttributes(span, result as OpenAiResponse, options.recordOutputs);
} catch {
// Ignore attribute processing errors
} finally {
span.end();
}
}

const result = await originalMethod.apply(context, args);
addResponseAttributes(span, result, options.recordOutputs);
return result;
} catch (error) {
captureException(error, {
mechanism: {
handled: false,
type: 'auto.ai.openai',
data: {
function: methodPath,
},
},
});
throw error;
}
},
);
}
};
// Return the original Promise (APIPromise) with all its methods intact
return result;
},
}) as (...args: T) => R | Promise<R>;
}

/**
Expand Down
Loading
Loading