Skip to content

Package: ThreadSafeBazaar_ITest$2

ThreadSafeBazaar_ITest$2

nameinstructionbranchcomplexitylinemethod
run()
M: 0 C: 20
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 6
100%
M: 0 C: 1
100%
{...}
M: 0 C: 6
100%
M: 0 C: 0
100%
M: 0 C: 1
100%
M: 0 C: 1
100%
M: 0 C: 1
100%

Coverage

1: /*******************************************************************************
2: * Copyright (c) 2018 Christian W. Damus and others.
3: *
4: * All rights reserved. This program and the accompanying materials
5: * are made available under the terms of the Eclipse Public License 2.0
6: * which accompanies this distribution, and is available at
7: * https://www.eclipse.org/legal/epl-2.0/
8: *
9: * SPDX-License-Identifier: EPL-2.0
10: *
11: * Contributors:
12: * Christian W. Damus - initial API and implementation
13: ******************************************************************************/
14: package org.eclipse.emfforms.bazaar.internal;
15:
16: import static java.lang.Integer.parseInt;
17: import static java.util.Arrays.asList;
18: import static org.hamcrest.CoreMatchers.is;
19: import static org.hamcrest.MatcherAssert.assertThat;
20: import static org.junit.Assert.fail;
21:
22: import java.util.ArrayList;
23: import java.util.Arrays;
24: import java.util.HashSet;
25: import java.util.List;
26: import java.util.Set;
27: import java.util.concurrent.BrokenBarrierException;
28: import java.util.concurrent.Callable;
29: import java.util.concurrent.CyclicBarrier;
30: import java.util.concurrent.ExecutionException;
31: import java.util.concurrent.ExecutorService;
32: import java.util.concurrent.Executors;
33: import java.util.concurrent.Future;
34: import java.util.concurrent.ThreadFactory;
35: import java.util.concurrent.TimeUnit;
36: import java.util.concurrent.TimeoutException;
37: import java.util.concurrent.atomic.AtomicInteger;
38:
39: import org.eclipse.emfforms.bazaar.Bazaar;
40: import org.eclipse.emfforms.bazaar.BazaarContext;
41: import org.eclipse.emfforms.bazaar.BazaarContextFunction;
42: import org.eclipse.emfforms.bazaar.Bid;
43: import org.eclipse.emfforms.bazaar.Create;
44: import org.eclipse.emfforms.bazaar.Exchange;
45: import org.eclipse.emfforms.bazaar.Vendor;
46: import org.junit.After;
47: import org.junit.Before;
48: import org.junit.Test;
49:
50: /**
51: * Specific concurrency tests for the {@link ThreadSafeBazaar}. Other
52: * tests are covered by the {@link Bazaar_Test} suite.
53: *
54: * @author Christian W. Damus
55: */
56: @SuppressWarnings("nls")
57: public class ThreadSafeBazaar_ITest {
58:
59:         static final int NUM_THREADS = 3;
60:
61:         private static AtomicInteger testCount = new AtomicInteger();
62:         private final Bazaar<String> bazaar = new ThreadSafeBazaar<String>();
63:         private volatile CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS);
64:         private ExecutorService exec;
65:
66:         /**
67:          * Initializes e
68:          */
69:         public ThreadSafeBazaar_ITest() {
70:                 super();
71:         }
72:
73:         @Test
74:         public void concurrentCreateProduct() {
75:                 testTemplateConcurrentRead(auctionCallable(), setOf("A", "B", "C"));
76:         }
77:
78:         <T> void testTemplateConcurrentRead(Callable<T> auction, Set<T> expected) {
79:                 final Set<T> actual = asyncAuction(auction, null);
80:                 assertThat(actual, is(expected));
81:         }
82:
83:         @SuppressWarnings("unchecked")
84:         @Test
85:         public void concurrentCreateProducts() {
86:                 testTemplateConcurrentRead(multiAuctionCallable(),
87:                         setOf(asList("A", "B", "C"), asList("B", "A", "C"), asList("C", "A", "B")));
88:         }
89:
90:         @SuppressWarnings("unchecked")
91:         @Test
92:         public void concurrentAddVendor() {
93:                 // Run an auction on three threads, that will stop at the barrier while
94:                 // bidding is in progress. At that point, add another vendor, then sync
95:                 // again to let the auction threads complete
96:                 final Callable<List<String>> auction = multiAuctionCallable();
97:                 final Runnable addVendor = new Runnable() {
98:                         @Override
99:                         public void run() {
100:                                 // Synchronize with the auction threads
101:                                 barrierSync();
102:
103:                                 // Add a vendor
104:                                 bazaar.addVendor(new StringVendor("D"));
105:
106:                                 // And let everyone finish
107:                                 barrierSync();
108:                         }
109:                 };
110:
111:                 // The new vendor did not participate
112:                 testTemplateConcurrentWrite(auction, addVendor,
113:                         setOf(asList("A", "B", "C"), asList("B", "A", "C"), asList("C", "A", "B")));
114:         }
115:
116:         <T> void testTemplateConcurrentWrite(Callable<T> auction, Runnable write, Set<T> expected) {
117:                 // Make room for one more party in synchronizing the bidding
118:                 barrier = new CyclicBarrier(NUM_THREADS + 1);
119:
120:                 final Set<T> products = asyncAuction(auction, write);
121:
122:                 // The auction completed as expected
123:                 assertThat(products, is(expected));
124:         }
125:
126:         @SuppressWarnings("unchecked")
127:         @Test
128:         public void concurrentAddContextFunction() {
129:                 // Run an auction on three threads, that will stop at the barrier while
130:                 // bidding is in progress. At that point, add a context function, then sync
131:                 // again to let the auction threads complete
132:                 final Callable<List<String>> auction = multiAuctionCallable();
133:                 final Runnable addVendor = new Runnable() {
134:                         @Override
135:                         public void run() {
136:                                 // Synchronize with the auction threads
137:                                 barrierSync();
138:
139:                                 final Thread wrongThread = Thread.currentThread();
140:
141:                                 // Add a context function that would derail the auction
142:                                 bazaar.addContextFunction(Thread.class.getName(),
143:                                         new BazaarContextFunction() {
144:                                                 @Exchange
145:                                                 public Thread computeThread() {
146:                                                         return wrongThread;
147:                                                 }
148:                                         });
149:
150:                                 // And let everyone finish
151:                                 barrierSync();
152:                         }
153:                 };
154:
155:                 testTemplateConcurrentWrite(auction, addVendor,
156:                         setOf(asList("A", "B", "C"), asList("B", "A", "C"), asList("C", "A", "B")));
157:         }
158:
159:         @Test
160:         public void concurrentSetOverlapCallback() {
161:                 // This vendor ties with the best of the others
162:                 bazaar.addVendor(new SimpleStringVendor("D", 100.0));
163:
164:                 // Run an auction on three threads, that will stop at the barrier while
165:                 // bidding is in progress. At that point, set an overlap call-back, then sync
166:                 // again to let the auction threads complete
167:                 final Callable<String> auction = auctionCallable();
168:                 final Runnable addVendor = new Runnable() {
169:                         @Override
170:                         public void run() {
171:                                 // Synchronize with the auction threads
172:                                 barrierSync();
173:
174:                                 // Set an overlap call-back that would derail the auction
175:                                 bazaar.setPriorityOverlapCallBack(new Bazaar.PriorityOverlapCallBack<String>() {
176:                                         @Override
177:                                         public void priorityOverlap(Vendor<? extends String> winner, Vendor<? extends String> overlapping) {
178:                                                 fail("Overlap call-back must not be called.");
179:                                         }
180:                                 });
181:
182:                                 // And let everyone finish
183:                                 barrierSync();
184:                         }
185:                 };
186:
187:                 testTemplateConcurrentWrite(auction, addVendor, setOf("A", "B", "C"));
188:         }
189:
190:         //
191:         // Test framework
192:         //
193:
194:         @Before
195:         public void createFixture() {
196:                 exec = createExecutorService(NUM_THREADS);
197:
198:                 bazaar.addVendor(new StringVendor("A"));
199:                 bazaar.addVendor(new StringVendor("B"));
200:                 bazaar.addVendor(new StringVendor("C"));
201:         }
202:
203:         ExecutorService createExecutorService(int threads) {
204:                 return Executors.newFixedThreadPool(threads, new ThreadFactory() {
205:                         private final int generation = testCount.incrementAndGet();
206:                         private final AtomicInteger ch = new AtomicInteger('A');
207:
208:                         @Override
209:                         public Thread newThread(Runnable r) {
210:                                 return new Thread(r, String.format("Bazaar-%s-%s",
211:                                         generation,
212:                                         (char) ch.getAndIncrement()));
213:                         }
214:                 });
215:         }
216:
217:         @After
218:         public void destroyFixture() {
219:                 exec.shutdownNow();
220:                 exec = null;
221:
222:                 barrier.reset();
223:         }
224:
225:         class SimpleStringVendor implements Vendor<String> {
226:                 private final String product;
227:                 private final double bid;
228:
229:                 SimpleStringVendor(String product, double bid) {
230:                         super();
231:
232:                         this.product = product;
233:                         this.bid = bid;
234:                 }
235:
236:                 @Bid
237:                 public double bid() {
238:                         return bid;
239:                 }
240:
241:                 @Create
242:                 public String create() {
243:                         return product;
244:                 }
245:         }
246:
247:         class StringVendor implements Vendor<String> {
248:                 private final String product;
249:
250:                 StringVendor(String product) {
251:                         super();
252:
253:                         this.product = product;
254:                 }
255:
256:                 @Bid
257:                 public double bid(Thread thread) throws InterruptedException, BrokenBarrierException, TimeoutException {
258:                         final String threadKey = lastChar(thread.getName());
259:                         if (threadKey.equals(product)) {
260:                                 // Ensure that all threads are in the midst of the auction
261:                                 barrierSync();
262:
263:                                 return 100.0;
264:                         }
265:
266:                         return Math.abs(16 - parseInt(product, 16)); // Should be non-negative
267:                 }
268:
269:                 private String lastChar(String s) {
270:                         return s.substring(s.length() - 1);
271:                 }
272:
273:                 @Create
274:                 public String create() {
275:                         return product;
276:                 }
277:         }
278:
279:         <T> Set<T> asyncAuction(Callable<T> auctionCallable, Runnable duringAuction) {
280:                 final List<Future<T>> asyncProducts = new ArrayList<Future<T>>(NUM_THREADS);
281:                 for (int i = 0; i < NUM_THREADS; i++) {
282:                         asyncProducts.add(exec.submit(auctionCallable));
283:                 }
284:
285:                 if (duringAuction != null) {
286:                         duringAuction.run();
287:                 }
288:
289:                 return getAll(asyncProducts);
290:         }
291:
292:         void barrierSync() {
293:                 try {
294:                         barrier.await(1L, TimeUnit.SECONDS);
295:                 } catch (final InterruptedException ex) {
296:                         fail("Test interrupted while synchronizing at barrier.");
297:                 } catch (final BrokenBarrierException ex) {
298:                         fail("Broken barrier in test thread synchronization.");
299:                 } catch (final TimeoutException ex) {
300:                         fail("Timed out in synchronization at barrier.");
301:                 }
302:
303:         }
304:
305:         <T> Set<T> getAll(Iterable<? extends Future<? extends T>> futures) {
306:                 final Set<T> result = new HashSet<T>();
307:
308:                 for (final Future<? extends T> next : futures) {
309:                         try {
310:                                 result.add(next.get(1L, TimeUnit.SECONDS));
311:                         } catch (final InterruptedException ex) {
312:                                 fail("Test interrupted while waiting for async product.");
313:                         } catch (final ExecutionException ex) {
314:                                 ex.getCause().printStackTrace();
315:                                 fail("Async auction failed with an exception");
316:                         } catch (final TimeoutException ex) {
317:                                 fail("Test timed out waiting for async product.");
318:                         }
319:                 }
320:
321:                 return result;
322:         }
323:
324:         static <T> Set<T> setOf(T... elements) {
325:                 return new HashSet<T>(Arrays.asList(elements));
326:         }
327:
328:         Callable<String> auctionCallable() {
329:                 return new Callable<String>() {
330:                         @Override
331:                         public String call() throws Exception {
332:                                 final BazaarContext ctx = BazaarContext.Builder.empty()
333:                                         .put(Thread.class, Thread.currentThread()).build();
334:                                 final String result = bazaar.createProduct(ctx);
335:
336:                                 // Synchronize on the way out of the auction
337:                                 barrierSync();
338:
339:                                 return result;
340:                         }
341:                 };
342:         }
343:
344:         Callable<List<String>> multiAuctionCallable() {
345:                 return new Callable<List<String>>() {
346:                         @Override
347:                         public List<String> call() throws Exception {
348:                                 final BazaarContext ctx = BazaarContext.Builder.empty()
349:                                         .put(Thread.class, Thread.currentThread()).build();
350:                                 final List<String> result = bazaar.createProducts(ctx);
351:
352:                                 // Synchronize on the way out of the auction
353:                                 barrierSync();
354:
355:                                 return result;
356:                         }
357:                 };
358:         }
359: }