microsoft/teams.net

Public

mirrored fromhttps://github.com/microsoft/teams.netAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
next/core

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.Stream.cs

212lines · modecode

1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the MIT License.
3
4using System.Collections.Concurrent;
5
6using Microsoft.Teams.Api;
7using Microsoft.Teams.Api.Activities;
8using Microsoft.Teams.Api.Entities;
9using Microsoft.Teams.Apps.Plugins;
10
11using static Microsoft.Teams.Common.Extensions.TaskExtensions;
12
13namespace Microsoft.Teams.Plugins.AspNetCore;
14
15public partial class AspNetCorePlugin
16{
17 public class Stream : IStreamer
18 {
19 public bool Closed => _closedAt is not null;
20 public int Count => _count;
21 public int Sequence => _index;
22
23 public required Func<IActivity, Task<IActivity>> Send { get; set; }
24 public event IStreamer.OnChunkHandler OnChunk = (_) => { };
25
26 protected int _index = 1;
27 protected string? _id;
28 protected string _text = string.Empty;
29 protected ChannelData _channelData = new();
30 protected List<Attachment> _attachments = [];
31 protected List<IEntity> _entities = [];
32 protected ConcurrentQueue<IActivity> _queue = [];
33
34 private DateTime? _closedAt;
35 private int _count = 0;
36 private MessageActivity? _result;
37 private readonly SemaphoreSlim _lock = new(1, 1);
38 private Timer? _timeout;
39
40 public void Emit(MessageActivity activity)
41 {
42 if (_timeout != null)
43 {
44 _timeout.Dispose();
45 _timeout = null;
46 }
47
48 _queue.Enqueue(activity);
49 _timeout = new Timer(_ =>
50 {
51 _ = Flush();
52 }, null, 500, Timeout.Infinite);
53 }
54
55 public void Emit(TypingActivity activity)
56 {
57 if (_timeout != null)
58 {
59 _timeout.Dispose();
60 _timeout = null;
61 }
62
63 _queue.Enqueue(activity);
64 _timeout = new Timer(_ =>
65 {
66 _ = Flush();
67 }, null, 500, Timeout.Infinite);
68 }
69
70 public void Emit(string text)
71 {
72 Emit(new MessageActivity(text));
73 }
74
75 public void Update(string text)
76 {
77 Emit(new TypingActivity(text)
78 {
79 ChannelData = new()
80 {
81 StreamType = StreamType.Informative
82 }
83 });
84 }
85
86 public async Task<MessageActivity?> Close(CancellationToken cancellationToken = default)
87 {
88 if (_index == 1 && _queue.Count == 0 && _lock.CurrentCount > 0) return null;
89 if (_result is not null) return _result;
90 while (_id is null || _queue.Count > 0)
91 {
92 await Task.Delay(50, cancellationToken);
93 }
94
95 if (_text == string.Empty && _attachments.Count == 0) // when only informative updates are present
96 {
97 _text = "Streaming closed with no content";
98 }
99
100 var activity = new MessageActivity(_text)
101 .AddAttachment(_attachments.ToArray());
102
103 activity.WithId(_id);
104 activity.WithData(_channelData);
105 activity.AddEntity(_entities.ToArray());
106 activity.AddStreamFinal();
107
108 var res = await Retry(() => Send(activity)).ConfigureAwait(false);
109 OnChunk(res);
110
111 _result = activity;
112 _closedAt = DateTime.Now;
113 _index = 1;
114 _id = null;
115 _text = string.Empty;
116 _attachments = [];
117 _entities = [];
118 _channelData = new();
119
120 return (MessageActivity)res;
121 }
122
123 protected async Task Flush()
124 {
125 if (_queue.Count == 0) return;
126
127 await _lock.WaitAsync();
128
129 try
130 {
131 if (_timeout != null)
132 {
133 _timeout.Dispose();
134 _timeout = null;
135 }
136
137 var i = 0;
138
139 Queue<TypingActivity> informativeUpdates = new();
140
141 while (i <= 10 && _queue.TryDequeue(out var activity))
142 {
143 if (activity is MessageActivity message)
144 {
145 _text += message.Text;
146 _attachments.AddRange(message.Attachments ?? []);
147 _entities.AddRange(message.Entities ?? []);
148 }
149
150 if (activity.ChannelData is not null)
151 {
152 _channelData = _channelData.Merge(activity.ChannelData);
153 }
154
155 if (activity is TypingActivity typing && typing.ChannelData?.StreamType == StreamType.Informative && _text == string.Empty)
156 {
157 // If `_text` is not empty then it's possible that streaming has started.
158 // And so informative updates cannot be sent.
159 informativeUpdates.Enqueue(typing);
160 }
161
162 i++;
163 _count++;
164 }
165
166 if (i == 0) return;
167
168 // Send informative updates
169 if (informativeUpdates.Count > 0)
170 {
171 while (informativeUpdates.TryDequeue(out var typing))
172 {
173 await SendActivity(typing);
174 }
175 }
176
177 // Send text chunk
178 if (_text != string.Empty)
179 {
180 var toSend = new TypingActivity(_text);
181 await SendActivity(toSend);
182 }
183
184 if (_queue.Count > 0)
185 {
186 _timeout = new Timer(_ =>
187 {
188 _ = Flush();
189 }, null, 500, Timeout.Infinite);
190 }
191
192 async Task SendActivity(TypingActivity toSend)
193 {
194 if (_id is not null)
195 {
196 toSend.WithId(_id);
197 }
198
199 toSend.AddStreamUpdate(_index);
200 var res = await Retry(() => Send(toSend)).ConfigureAwait(false);
201 OnChunk(res);
202 _id ??= res.Id;
203 _index++;
204 }
205 }
206 finally
207 {
208 _lock.Release();
209 }
210 }
211 }
212}