microsoft/teams.net

Public

mirrored from https://github.com/microsoft/teams.netAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
samples/repro-da

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.Stream.cs

211lines · modecode

1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT License.
3
4using System.Collections.Concurrent;
5
6using Microsoft.Teams.Api;
7using Microsoft.Teams.Api.Activities;
8using Microsoft.Teams.Api.Entities;
9using Microsoft.Teams.Apps.Plugins;
10
11using static Microsoft.Teams.Common.Extensions.TaskExtensions;
12
13namespace Microsoft.Teams.Plugins.AspNetCore;
14
15public partial class AspNetCorePlugin
16{
17 public class Stream : IStreamer
18 {
19 public bool Closed => _closedAt is not null;
20 public int Count => _count;
21 public int Sequence => _index;
22
23 public required Func<IActivity, Task<IActivity>> Send { get; set; }
24 public event IStreamer.OnChunkHandler OnChunk = (_) => { };
25
26 protected int _index = 1;
27 protected string? _id;
28 protected string _text = string.Empty;
29 protected ChannelData _channelData = new();
30 protected List<Attachment> _attachments = [];
31 protected List<IEntity> _entities = [];
32 protected ConcurrentQueue<IActivity> _queue = [];
33
34 private DateTime? _closedAt;
35 private int _count = 0;
36 private MessageActivity? _result;
37 private readonly SemaphoreSlim _lock = new(1, 1);
38 private Timer? _timeout;
39
40 public void Emit(MessageActivity activity)
41 {
42 if (_timeout != null)
43 {
44 _timeout.Dispose();
45 _timeout = null;
46 }
47
48 _queue.Enqueue(activity);
49 _timeout = new Timer(_ =>
50 {
51 _ = Flush();
52 }, null, 500, Timeout.Infinite);
53 }
54
55 public void Emit(TypingActivity activity)
56 {
57 if (_timeout != null)
58 {
59 _timeout.Dispose();
60 _timeout = null;
61 }
62
63 _queue.Enqueue(activity);
64 _timeout = new Timer(_ =>
65 {
66 _ = Flush();
67 }, null, 500, Timeout.Infinite);
68 }
69
70 public void Emit(string text)
71 {
72 Emit(new MessageActivity(text));
73 }
74
75 public void Update(string text)
76 {
77 Emit(new TypingActivity(text)
78 {
79 ChannelData = new()
80 {
81 StreamType = StreamType.Informative
82 }
83 });
84 }
85
86 public async Task<MessageActivity?> Close(CancellationToken cancellationToken = default)
87 {
88 if (_index == 1 && _queue.Count == 0 && _lock.CurrentCount > 0) return null;
89 if (_result is not null) return _result;
90 while (_id is null || _queue.Count > 0)
91 {
92 await Task.Delay(50, cancellationToken);
93 }
94
95 if (_text == string.Empty && _attachments.Count == 0) // when only informative updates are present
96 {
97 _text = "Streaming closed with no content";
98 }
99
100 var activity = new MessageActivity(_text)
101 .AddAttachment(_attachments.ToArray());
102
103 activity.WithId(_id);
104 activity.WithData(_channelData);
105 activity.AddEntity(_entities.ToArray());
106 activity.AddStreamFinal();
107
108 var res = await Retry(() => Send(activity)).ConfigureAwait(false);
109 OnChunk(res);
110
111 _result = activity;
112 _closedAt = DateTime.Now;
113 _index = 1;
114 _id = null;
115 _text = string.Empty;
116 _attachments = [];
117 _entities = [];
118 _channelData = new();
119
120 return (MessageActivity)res;
121 }
122
123 protected async Task Flush()
124 {
125 if (_queue.Count == 0) return;
126
127 await _lock.WaitAsync();
128
129 try
130 {
131 if (_timeout != null)
132 {
133 _timeout.Dispose();
134 _timeout = null;
135 }
136
137 Queue<TypingActivity> informativeUpdates = new();
138 var dequeued = 0;
139
140 while (_queue.TryDequeue(out var activity))
141 {
142 if (activity is MessageActivity message)
143 {
144 _text += message.Text;
145 _attachments.AddRange(message.Attachments ?? []);
146 _entities.AddRange(message.Entities ?? []);
147 }
148
149 if (activity.ChannelData is not null)
150 {
151 _channelData = _channelData.Merge(activity.ChannelData);
152 }
153
154 if (activity is TypingActivity typing && typing.ChannelData?.StreamType == StreamType.Informative && _text == string.Empty)
155 {
156 // If `_text` is not empty then it's possible that streaming has started.
157 // And so informative updates cannot be sent.
158 informativeUpdates.Enqueue(typing);
159 }
160
161 dequeued++;
162 _count++;
163 }
164
165 if (dequeued == 0) return;
166
167 // Send informative updates
168 if (informativeUpdates.Count > 0)
169 {
170 while (informativeUpdates.TryDequeue(out var typing))
171 {
172 await SendActivity(typing);
173 }
174 }
175
176 // Send text chunk
177 if (_text != string.Empty)
178 {
179 var toSend = new TypingActivity(_text);
180 await SendActivity(toSend);
181 }
182
183 if (_queue.Count > 0)
184 {
185 _timeout = new Timer(_ =>
186 {
187 _ = Flush();
188 }, null, 500, Timeout.Infinite);
189 }
190
191 async Task SendActivity(TypingActivity toSend)
192 {
193 if (_id is not null)
194 {
195 toSend.WithId(_id);
196 }
197
198 toSend.AddStreamUpdate(_index);
199 var res = await Retry(() => Send(toSend)).ConfigureAwait(false);
200 OnChunk(res);
201 _id ??= res.Id;
202 _index++;
203 }
204 }
205 finally
206 {
207 _lock.Release();
208 }
209 }
210 }
211}