microsoft/qdk

Public

mirrored fromhttps://github.com/microsoft/qdkAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
iadavis/pipeline-issue-debugging

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

source/npm/qsharp/src/workers/common.ts

572lines · modecode

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4import { type IQSharpError } from "../../lib/web/qsc_wasm.js";
5import { CancellationToken } from "../cancellation.js";
6import { QdkDiagnostics } from "../diagnostics.js";
7import { TelemetryEvent, log } from "../log.js";
8type Wasm = typeof import("../../lib/web/qsc_wasm.js");
9
10/**
11 * Describes a service that can be run in a worker.
12 */
13export interface ServiceProtocol<
14 TService extends ServiceMethods<TService>,
15 TServiceEventMsg extends IServiceEventMessage,
16> {
17 /** The concrete class that implements the service. */
18 class: { new (wasmModule: Wasm): TService };
19 /** Methods that can be proxied from the main thread to the worker. @see MethodMap*/
20 methods: MethodMap<TService>;
21 /** Events that can be received by the main thread from the worker. */
22 eventNames: TServiceEventMsg["type"][];
23}
24
25/**
26 * Used as a type constraint for a "service", i.e. an object
27 * we can create proxy methods for. The type shouldn't define
28 * any non-method properties.
29 */
30export type ServiceMethods<T> = { [x in keyof T]: (...args: any[]) => any };
31
32/**
33 * Defines the service methods that the proxy will handle and their types.
34 *
35 * "request" is a normal async method.
36 *
37 * "requestWithProgress" methods take an `IServiceEventTarget` to
38 * communicate events back to the main thread as they run. They also set
39 * the service state to "busy" while they run.
40 *
41 * "addEventListener" and "removeEventListener" methods are used to
42 * subscribe to events from the service.
43 */
44export type MethodMap<T> = {
45 [M in keyof T]:
46 | "request"
47 | "requestWithProgress"
48 | "addEventListener"
49 | "removeEventListener";
50};
51
52/** Methods added to the service when wrapped in a proxy */
53export type IServiceProxy = {
54 onstatechange: ((state: ServiceState) => void) | null;
55 terminate: () => void;
56};
57
58/** "requestWithProgress" type methods will set the service state to "busy" */
59export type ServiceState = "idle" | "busy";
60
61/** Request message from a main thread to the worker */
62export type RequestMessage<T extends ServiceMethods<T>> = {
63 [K in keyof T]: { type: K; args: Parameters<T[K]> };
64}[keyof T];
65
66/** Response message for a request from the worker to the main thread */
67export type ResponseMessage<T extends ServiceMethods<T>> = {
68 messageType: "response";
69} & {
70 [K in keyof T]: {
71 type: K;
72 result:
73 | { success: true; result: Awaited<ReturnType<T[K]>> }
74 | { success: false; error: unknown };
75 };
76}[keyof T];
77
78/** Event message from the worker to the main thread */
79export type EventMessage<TEventMsg extends IServiceEventMessage> = {
80 messageType: "event";
81} & TEventMsg;
82
83/** Used as a constraint for events defined by the service */
84export interface IServiceEventMessage {
85 type: string;
86 detail: unknown;
87}
88
89/**
90 * Common event types all workers can send.
91 */
92type CommonEvent =
93 | { type: "telemetry-event"; detail: TelemetryEvent }
94 | {
95 type: "log";
96 detail: { level: number; target: string; data: any[] };
97 };
98type CommonEventMessage = CommonEvent & { messageType: "common-event" };
99
100/**
101 * Strongly typed EventTarget interface. Used as a constraint for the
102 * event target that "requestWithProgress" methods should take in the service.
103 */
104export interface IServiceEventTarget<TEvents extends IServiceEventMessage> {
105 addEventListener<T extends TEvents["type"]>(
106 type: T,
107 listener: (event: Event & Extract<TEvents, { type: T }>) => void,
108 ): void;
109
110 removeEventListener<T extends TEvents["type"]>(
111 type: T,
112 listener: (event: Event & Extract<TEvents, { type: T }>) => void,
113 ): void;
114
115 dispatchEvent(event: Event & TEvents): boolean;
116}
117
118/** Holds state for a single request received by the proxy */
119type RequestState<
120 TService extends ServiceMethods<TService>,
121 TServiceEventMsg extends IServiceEventMessage,
122> = RequestMessage<TService> & {
123 resolve: (val: any) => void;
124 reject: (err: any) => void;
125 requestEventTarget?: IServiceEventTarget<TServiceEventMsg>;
126 cancellationToken?: CancellationToken;
127};
128
129/*
130The WorkerProxy works by queuing up requests to send over to the Worker, only
131ever having one in flight at a time. By queuing on the caller side, this allows
132for cancellation (it checks if a request is cancelled before sending to the worker).
133
134The queue contains an entry for each request with the data to send, the promise
135to resolve, the event handler, and the cancellation token. When a request completes
136the next one (if present) is fetched from the queue. If it is marked as cancelled,
137it is resolved immediately, else it is marked as the current request and the command
138sent to the worker. As events occurs on the current request the event handler is
139invoked. When the response is received this is used to resolve the promise and
140complete the request.
141*/
142
143/**
144 * Function to create the proxy for a type. To be used from the main thread.
145 *
146 * @param postMessage A function to post messages to the worker
147 * @param terminator A function to call to tear down the worker thread
148 * @param methods A map of method names to be proxied and some metadata @see MethodMap
149 * @returns The proxy object. The caller should then set the onMsgFromWorker
150 * property to a callback that will receive messages from the worker.
151 */
152export function createProxyInternal<
153 TService extends ServiceMethods<TService>,
154 TServiceEventMsg extends IServiceEventMessage,
155>(
156 postMessage: (msg: RequestMessage<TService>) => void,
157 terminator: () => void,
158 methods: MethodMap<TService>,
159): TService &
160 IServiceProxy & {
161 onMsgFromWorker: (
162 msg: ResponseMessage<TService> | EventMessage<TServiceEventMsg>,
163 ) => void;
164 } {
165 const queue: RequestState<TService, TServiceEventMsg>[] = [];
166 const eventTarget = new EventTarget();
167 let curr: RequestState<TService, TServiceEventMsg> | undefined;
168 let state: ServiceState = "idle";
169
170 function setState(newState: ServiceState) {
171 if (state === newState) return;
172 state = newState;
173 if (proxy.onstatechange) proxy.onstatechange(state);
174 }
175
176 type ResultOf<TRespMsg> = TRespMsg extends { result: infer R } ? R : never;
177
178 function queueRequest(
179 msg: RequestMessage<TService>,
180 requestEventTarget?: IServiceEventTarget<TServiceEventMsg>,
181 cancellationToken?: CancellationToken,
182 ): Promise<ResultOf<ResponseMessage<TService>>> {
183 return new Promise((resolve, reject) => {
184 queue.push({
185 type: msg.type,
186 args: msg.args,
187 resolve,
188 reject,
189 requestEventTarget,
190 cancellationToken,
191 } as RequestState<TService, TServiceEventMsg>);
192
193 // If nothing was running when this got added, kick off processing
194 if (queue.length === 1) doNextRequest();
195 });
196 }
197
198 function doNextRequest() {
199 if (curr) return;
200
201 while ((curr = queue.shift())) {
202 if (curr.cancellationToken?.isCancellationRequested) {
203 curr.reject("cancelled");
204 continue;
205 } else {
206 break;
207 }
208 }
209 if (!curr) {
210 // Nothing else queued, signal that we're now idle and exit.
211 log.trace("Proxy: Worker queue is empty");
212 setState("idle");
213 return;
214 }
215
216 const msg = { type: curr.type, args: curr.args };
217 if (methods[curr.type] === "requestWithProgress") {
218 setState("busy");
219 }
220
221 log.trace("Proxy: Posting message to worker: %o", msg);
222 postMessage(msg);
223 }
224
225 function onMsgFromWorker(
226 msg:
227 | ResponseMessage<TService>
228 | EventMessage<TServiceEventMsg>
229 | CommonEventMessage,
230 ) {
231 if (log.getLogLevel() >= 4)
232 log.trace("Proxy: Received message from worker: %s", JSON.stringify(msg));
233
234 if (msg.messageType === "common-event") {
235 const commonEvent = msg; // assignment is necessary here for TypeScript to narrow the type
236 switch (commonEvent.type) {
237 case "telemetry-event":
238 {
239 const detail = commonEvent.detail;
240 log.logTelemetry(detail);
241 }
242 break;
243 case "log":
244 {
245 const detail = commonEvent.detail;
246 log.logWithLevel(detail.level, detail.target, ...detail.data);
247 }
248 break;
249 }
250 } else if (msg.messageType === "event") {
251 const event = new Event(msg.type) as Event & TServiceEventMsg;
252 event.detail = msg.detail;
253
254 log.trace("Proxy: Posting event: %o", msg);
255 // Post to a currently attached event target if there's a "requestWithProgress"
256 // in progress
257 curr?.requestEventTarget?.dispatchEvent(event);
258 // Also post to the general event target
259 eventTarget.dispatchEvent(event);
260 } else if (msg.messageType === "response") {
261 if (!curr) {
262 log.error("Proxy: No active request when message received: %o", msg);
263 return;
264 }
265 const result = {
266 success: msg.result.success,
267 data: msg.result.success ? msg.result.result : msg.result.error,
268 };
269 if (result.success) {
270 curr.resolve(result.data);
271 curr = undefined;
272 doNextRequest();
273 } else {
274 let err = result.data;
275
276 // The error may be a serialized error object.
277 err = deserializeIfError(err);
278
279 curr.reject(err);
280 curr = undefined;
281 doNextRequest();
282 }
283 }
284 }
285
286 // Create the proxy object to be returned
287 const proxy = {} as TService &
288 IServiceProxy & { onMsgFromWorker: typeof onMsgFromWorker };
289
290 // Assign each method with the desired proxying behavior
291 for (const methodName of Object.keys(methods) as (keyof TService &
292 string)[]) {
293 // @ts-expect-error - tricky to derive the type of the actual method here
294 proxy[methodName] = (...args: any[]) => {
295 let requestEventTarget:
296 | IServiceEventTarget<TServiceEventMsg>
297 | undefined = undefined;
298
299 switch (methods[methodName]) {
300 case "addEventListener":
301 {
302 // @ts-expect-error - can't get the typing of the rest parameters quite right
303 eventTarget.addEventListener(...args);
304 }
305 break;
306 case "removeEventListener":
307 {
308 // @ts-expect-error - can't get the typing of the rest parameters quite right
309 eventTarget.removeEventListener(...args);
310 }
311 break;
312 case "requestWithProgress": {
313 // For progress methods, the last argument is the event target
314 requestEventTarget = args[args.length - 1];
315 args = args.slice(0, args.length - 1);
316 }
317 // fallthrough
318 case "request": {
319 return queueRequest(
320 { type: methodName, args } as RequestMessage<TService>,
321 requestEventTarget,
322 );
323 }
324 }
325 };
326 }
327
328 proxy.onstatechange = null;
329 proxy.terminate = () => {
330 // Kill the worker without a chance to shutdown. May be needed if it is not responding.
331 log.info("Proxy: Terminating the worker");
332 if (curr) {
333 log.trace(
334 "Proxy: Terminating running worker item of type: %s",
335 curr.type,
336 );
337 curr.reject("terminated");
338 }
339 // Reject any outstanding items
340 while (queue.length) {
341 const item = queue.shift();
342 log.trace(
343 "Proxy: Terminating outstanding work item of type: %s",
344 item?.type,
345 );
346 item?.reject("terminated");
347 }
348 terminator();
349 };
350 proxy.onMsgFromWorker = onMsgFromWorker;
351
352 return proxy;
353}
354
355/**
356 * Function to wrap a service in a dispatcher. To be used in the worker thread.
357 *
358 * @param service The service to be wrapped
359 * @param methods A map of method names. Should match the list passed into @see createProxyInternal.
360 * @param eventNames The list of event names that the service can emit
361 * @param postMessage A function to post messages back to the main thread
362 * @returns A function that takes a message and invokes the corresponding
363 * method on the service. The caller should then set this method as a message handler.
364 */
365function createDispatcher<
366 TService extends ServiceMethods<TService>,
367 TServiceEventMsg extends IServiceEventMessage,
368>(
369 postMessage: (
370 msg: ResponseMessage<TService> | EventMessage<TServiceEventMsg>,
371 ) => void,
372 service: TService,
373 methods: MethodMap<TService>,
374 eventNames: TServiceEventMsg["type"][],
375): (req: RequestMessage<TService>) => Promise<void> {
376 log.trace("Worker: Constructing WorkerEventHandler");
377
378 function logAndPost(
379 msg: ResponseMessage<TService> | EventMessage<TServiceEventMsg>,
380 ) {
381 log.trace(
382 "Worker: Sending %s message from worker: %o",
383 msg.messageType,
384 msg,
385 );
386 postMessage(msg);
387 }
388
389 const eventTarget =
390 new EventTarget() as IServiceEventTarget<TServiceEventMsg>;
391
392 eventNames.forEach((eventName: TServiceEventMsg["type"]) => {
393 // Subscribe to all known events and forward them as messages to the main thread.
394 eventTarget.addEventListener(eventName, (ev) => {
395 logAndPost({
396 messageType: "event",
397 type: ev.type,
398 detail: ev.detail,
399 });
400 });
401
402 // If there's an addEventListener on the object itself, forward those events as well.
403 if ((service as any).addEventListener) {
404 (service as any).addEventListener(eventName, (ev: any) => {
405 logAndPost({
406 messageType: "event",
407 type: ev.type,
408 detail: ev.detail,
409 });
410 });
411 }
412 });
413
414 return function invokeMethod(req: RequestMessage<TService>) {
415 // Pass the eventTarget to the methods marked as taking progress
416 return service[req.type]
417 .call(
418 service,
419 ...req.args,
420 methods[req.type] === "requestWithProgress" ? eventTarget : undefined,
421 )
422 .then((result: any) =>
423 logAndPost({
424 messageType: "response",
425 type: req.type,
426 result: { success: true, result },
427 }),
428 )
429 .catch((err: any) => {
430 // Serialize the error if it's a known type.
431 err = serializeIfError(err);
432
433 logAndPost({
434 // If this happens then the wasm code likely threw an exception/panicked rather than
435 // completing gracefully and fulfilling the promise. Communicate to the client
436 // that there was an error and it should reject the current request
437 messageType: "response",
438 type: req.type,
439 result: { success: false, error: err },
440 });
441 });
442 };
443}
444
445/**
446 * Creates and initializes the actual service. To be used in the worker thread.
447 *
448 * @param postMessage A function to post messages back to the main thread
449 * @param serviceProtocol An object that describes the service: its constructor, methods and events
450 * @param wasm The wasm module to initialize the service with
451 * @param qscLogLevel The log level to initialize the service with
452 * @returns A function that takes a message and invokes the corresponding
453 * method on the service. The caller should then set this method as a message handler.
454 */
455export function initService<
456 TService extends ServiceMethods<TService>,
457 TServiceEventMsg extends IServiceEventMessage,
458>(
459 postMessage: (
460 msg:
461 | ResponseMessage<TService>
462 | EventMessage<TServiceEventMsg>
463 | CommonEventMessage,
464 ) => void,
465 serviceProtocol: ServiceProtocol<TService, TServiceEventMsg>,
466 wasm: Wasm,
467 qscLogLevel?: number,
468): (req: RequestMessage<TService>) => Promise<void> {
469 function postTelemetryMessage(telemetry: TelemetryEvent) {
470 postMessage({
471 messageType: "common-event",
472 type: "telemetry-event",
473 detail: telemetry,
474 });
475 }
476
477 function postLogMessage(level: number, target: string, ...args: any) {
478 if (log.getLogLevel() < level) {
479 return;
480 }
481
482 let data = args;
483 try {
484 // Only structured cloneable objects can be sent in worker messages.
485 // Test if this is the case.
486 structuredClone(args);
487 } catch {
488 // Uncloneable object.
489 // Use String(args) instead of ${args} to handle all possible values
490 // without throwing. See: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String#string_coercion
491 data = ["unsupported log data " + String(args)];
492 }
493 postMessage({
494 messageType: "common-event",
495 type: "log",
496 detail: { level, target, data },
497 });
498 }
499
500 // Override the global logger
501 log.error = (...args) => postLogMessage(1, "worker", ...args);
502 log.warn = (...args) => postLogMessage(2, "worker", ...args);
503 log.info = (...args) => postLogMessage(3, "worker", ...args);
504 log.debug = (...args) => postLogMessage(4, "worker", ...args);
505 log.trace = (...args) => postLogMessage(5, "worker", ...args);
506
507 if (qscLogLevel !== undefined) {
508 log.setLogLevel(qscLogLevel);
509 }
510
511 // Set up logging and telemetry as soon as possible after instantiating
512 log.onLevelChanged = (level) => wasm.setLogLevel(level);
513 log.setTelemetryCollector(postTelemetryMessage);
514 wasm.initLogging(postLogMessage, log.getLogLevel());
515
516 // Create the actual service and return the dispatcher method
517 const service = new serviceProtocol.class(wasm);
518 return createDispatcher(
519 postMessage,
520 service,
521 serviceProtocol.methods,
522 serviceProtocol.eventNames,
523 );
524}
525
526/**
527 * Serializes an error, if it is a known type, so that it can be sent between threads.
528 *
529 * By default, browsers can only send certain types of errors between the main thread and a worker.
530 * See: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#error_types
531 *
532 * Serializing our own custom errors allows us to send them between threads.
533 */
534function serializeIfError(err: unknown) {
535 if (err instanceof QdkDiagnostics) {
536 err = { name: err.name, data: err.diagnostics };
537 } else if (err instanceof WebAssembly.RuntimeError) {
538 err = {
539 name: "WebAssembly.RuntimeError",
540 message: err.message,
541 stack: err.stack,
542 };
543 }
544 return err;
545}
546
547/**
548 * Deserializes an error if it is a known type.
549 *
550 * By default, browsers can only send certain types of errors between the main thread and a worker.
551 * See: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#error_types
552 *
553 * Serializing our own custom errors allows us to send them between threads.
554 */
555function deserializeIfError(err: unknown) {
556 if (err !== null && typeof err === "object" && "name" in err) {
557 if (err.name === "QdkDiagnostics" && "data" in err) {
558 err = new QdkDiagnostics(err.data as IQSharpError[]);
559 } else if (
560 err.name === "WebAssembly.RuntimeError" &&
561 "message" in err &&
562 (typeof err.message === "string" || typeof err.message === "undefined") &&
563 "stack" in err &&
564 (typeof err.stack === "string" || typeof err.stack === "undefined")
565 ) {
566 const newErr = new WebAssembly.RuntimeError(err.message);
567 newErr.stack = err.stack;
568 err = newErr;
569 }
570 }
571 return err;
572}
573