4747import java .util .List ;
4848import java .util .Map ;
4949import java .util .SortedMap ;
50- import java .util .TreeMap ;
5150import java .util .WeakHashMap ;
5251import java .util .concurrent .ConcurrentHashMap ;
52+ import java .util .concurrent .ConcurrentSkipListMap ;
5353import java .util .concurrent .CountDownLatch ;
5454import java .util .concurrent .LinkedBlockingQueue ;
5555import java .util .concurrent .Semaphore ;
5656import java .util .concurrent .atomic .AtomicBoolean ;
57+ import java .util .concurrent .atomic .AtomicInteger ;
5758import java .util .concurrent .atomic .AtomicLong ;
5859import java .util .concurrent .locks .ReentrantLock ;
5960import java .util .logging .Level ;
@@ -564,13 +565,20 @@ public boolean compareAndSetExiting(boolean expect, boolean update) {
564565
565566 public static final class SharedMultiprocessingData {
566567
567- private int fdCounter = 0 ;
568+ /**
569+ * A sentinel object that remains in the {@link LinkedBlockingQueue} in the
570+ * {@link #pipeData}. It is pushed there in #close so that any blocking #take calls can wake
571+ * up and react to the end of the stream.
572+ */
573+ private static final Object SENTINEL = new Object ();
574+
575+ private final AtomicInteger fdCounter = new AtomicInteger (0 );
568576
569577 /**
570578 * Maps the two fake file descriptors created in {@link #pipe()} to one
571579 * {@link LinkedBlockingQueue}
572580 */
573- private final SortedMap <Integer , LinkedBlockingQueue <Object >> pipeData = new TreeMap <>();
581+ private final SortedMap <Integer , LinkedBlockingQueue <Object >> pipeData = new ConcurrentSkipListMap <>();
574582
575583 /**
576584 * Holds ref count of file descriptors which were passed over to a spawned child context.
@@ -580,7 +588,7 @@ public static final class SharedMultiprocessingData {
580588 * <li>real file descriptors coming from the posix implementation</li>
581589 * </ul>
582590 */
583- private final Map <Integer , Integer > fdRefCount = new HashMap <>();
591+ private final Map <Integer , Integer > fdRefCount = new ConcurrentHashMap <>();
584592
585593 public SharedMultiprocessingData (ConcurrentHashMap <String , Semaphore > namedSemaphores ) {
586594 this .namedSemaphores = namedSemaphores ;
@@ -591,9 +599,7 @@ public SharedMultiprocessingData(ConcurrentHashMap<String, Semaphore> namedSemap
591599 */
592600 @ TruffleBoundary
593601 private void incrementFDRefCount (int fd ) {
594- synchronized (fdRefCount ) {
595- fdRefCount .compute (fd , (f , count ) -> (count == null ) ? 1 : count + 1 );
596- }
602+ fdRefCount .compute (fd , (f , count ) -> (count == null ) ? 1 : count + 1 );
597603 }
598604
599605 /**
@@ -604,19 +610,15 @@ private void incrementFDRefCount(int fd) {
604610 */
605611 @ TruffleBoundary
606612 public boolean decrementFDRefCount (int fd ) {
607- synchronized (fdRefCount ) {
608- Integer c = fdRefCount .get (fd );
609- if (c == null ) {
610- return false ;
611- }
612- if (c == 0 ) {
613- fdRefCount .remove (fd );
614- return false ;
613+ Integer cnt = fdRefCount .computeIfPresent (fd , (f , count ) -> {
614+ if (count == 0 || count == Integer .MIN_VALUE ) {
615+ return Integer .MIN_VALUE ;
615616 } else {
616- fdRefCount . put ( fd , c - 1 ) ;
617- return true ;
617+ assert count > 0 ;
618+ return count - 1 ;
618619 }
619- }
620+ });
621+ return cnt != null && !fdRefCount .remove (fd , Integer .MIN_VALUE );
620622 }
621623
622624 /**
@@ -625,86 +627,122 @@ public boolean decrementFDRefCount(int fd) {
625627 */
626628 @ TruffleBoundary
627629 public int [] pipe () {
628- synchronized (pipeData ) {
629- LinkedBlockingQueue <Object > q = new LinkedBlockingQueue <>();
630- int readFD = --fdCounter ;
631- pipeData .put (readFD , q );
632- int writeFD = --fdCounter ;
633- pipeData .put (writeFD , q );
634- return new int []{readFD , writeFD };
635- }
630+ LinkedBlockingQueue <Object > q = new LinkedBlockingQueue <>();
631+ int writeFD = fdCounter .addAndGet (-2 );
632+ assert isWriteFD (writeFD );
633+ int readFD = getPairFd (writeFD );
634+ pipeData .put (readFD , q );
635+ pipeData .put (writeFD , q );
636+ return new int []{readFD , writeFD };
636637 }
637638
639+ /**
640+ * Adding pipe data needs no special synchronization, since we guarantee there is only ever
641+ * one or no queue registered for a given fd.
642+ */
638643 @ TruffleBoundary
639644 public void addPipeData (int fd , byte [] bytes , Runnable noFDHandler , Runnable brokenPipeHandler ) {
640- LinkedBlockingQueue < Object > q = null ;
641- synchronized ( pipeData ) {
642- q = pipeData . get ( fd );
643- if ( q == null ) {
644- noFDHandler .run ();
645- throw CompilerDirectives .shouldNotReachHere ();
646- }
647- int fd2 = getPairFd (fd );
648- if (isClosed (fd2 )) {
649- brokenPipeHandler . run ();
650- throw CompilerDirectives . shouldNotReachHere ();
651- }
645+ assert isWriteFD ( fd ) ;
646+ LinkedBlockingQueue < Object > q = pipeData . get ( fd );
647+ if ( q == null ) {
648+ // the write end is already closed
649+ noFDHandler .run ();
650+ throw CompilerDirectives .shouldNotReachHere ();
651+ }
652+ int fd2 = getPairFd (fd );
653+ if (isClosed (fd2 )) {
654+ // the read end is already closed
655+ brokenPipeHandler . run ();
656+ throw CompilerDirectives . shouldNotReachHere ();
652657 }
653658 q .add (bytes );
654659 }
655660
661+ /**
662+ * Closing the read end of a pipe just removes the mapping from that fd to the queue.
663+ * Closing the write end adds the {@link #SENTINEL} value as the last value. There is a
664+ * potential race here for incorrect code that concurrently writes to the write end via
665+ * {@link #addPipeData}, in that the sentinel may prevent writes from being visible.
666+ */
656667 @ TruffleBoundary
657668 public void closePipe (int fd ) {
658- synchronized (pipeData ) {
659- pipeData .remove (fd );
669+ LinkedBlockingQueue <Object > q = pipeData .remove (fd );
670+ if (q != null && isWriteFD (fd )) {
671+ q .offer (SENTINEL );
660672 }
661673 }
662674
675+ /**
676+ * This needs no additional synchronization, since if the write-end of the pipe is already
677+ * closed, the {@link #take} call will return appropriately.
678+ */
663679 @ TruffleBoundary
664680 public Object takePipeData (Node node , int fd , Runnable noFDHandler ) {
665- LinkedBlockingQueue <Object > q ;
666- synchronized (pipeData ) {
667- q = pipeData .get (fd );
668- if (q == null ) {
669- noFDHandler .run ();
670- throw CompilerDirectives .shouldNotReachHere ();
671- }
672- int fd2 = getPairFd (fd );
673- if (isClosed (fd2 )) {
674- if (q .isEmpty ()) {
675- return PythonUtils .EMPTY_BYTE_ARRAY ;
676- }
677- }
681+ LinkedBlockingQueue <Object > q = pipeData .get (fd );
682+ if (q == null ) {
683+ noFDHandler .run ();
684+ throw CompilerDirectives .shouldNotReachHere ();
678685 }
679686 Object [] o = new Object []{PNone .NONE };
680687 TruffleSafepoint .setBlockedThreadInterruptible (node , (lbq ) -> {
681- o [0 ] = lbq . take ();
688+ o [0 ] = take (lbq );
682689 }, q );
683690 return o [0 ];
684691 }
685692
693+ /**
694+ * This uses LinkedBlockingQueue#compute to determine the blocking state. The runnable may
695+ * be run multiple times, so we need to check and write all possible results to the result
696+ * array. This ensures that if there is concurrent modification of the {@link #pipeData}, we
697+ * will get a valid result.
698+ */
686699 @ TruffleBoundary
687700 public boolean isBlocking (int fd ) {
688- LinkedBlockingQueue <Object > q ;
689- synchronized (pipeData ) {
690- q = pipeData .get (fd );
701+ boolean [] result = new boolean []{false };
702+ pipeData .compute (fd , (f , q ) -> {
691703 if (q == null ) {
692- return false ;
693- }
694- int fd2 = getPairFd (fd );
695- if (isClosed (fd2 )) {
696- return false ;
704+ result [0 ] = false ;
705+ } else {
706+ int fd2 = getPairFd (fd );
707+ if (isClosed (fd2 )) {
708+ result [0 ] = false ;
709+ } else {
710+ // this uses q.isEmpty() instead of our isEmpty(q), because we are not
711+ // interested in the race between closing fd2 and this runnable. If the
712+ // SENTINEL is pushed in the meantime, we should return false, just as if
713+ // we had observed fd2 to be closed already.
714+ result [0 ] = q .isEmpty ();
715+ }
697716 }
698- }
699- return q .isEmpty ();
717+ return q ;
718+ });
719+ return result [0 ];
700720 }
701721
702722 private static int getPairFd (int fd ) {
703- return fd % 2 == 0 ? fd + 1 : fd - 1 ;
723+ return isWriteFD (fd ) ? fd + 1 : fd - 1 ;
724+ }
725+
726+ private static boolean isWriteFD (int fd ) {
727+ return fd % 2 == 0 ;
728+ }
729+
730+ private static Object take (LinkedBlockingQueue <Object > q ) throws InterruptedException {
731+ Object v = q .take ();
732+ if (v == SENTINEL ) {
733+ q .offer (SENTINEL );
734+ return PythonUtils .EMPTY_BYTE_ARRAY ;
735+ } else {
736+ return v ;
737+ }
704738 }
705739
706740 private boolean isClosed (int fd ) {
707- return pipeData .get (fd ) == null && fd >= fdCounter ;
741+ // since there is no way that any thread can be trying to read/write to this pipe FD
742+ // legally before it was added to pipeData in #pipe above, we don't need to
743+ // synchronize. If the FD is taken, and it's not in pipe data, this is a race in the
744+ // program, because some thread is just arbitrarily probing FDs.
745+ return fd >= fdCounter .get () && pipeData .get (fd ) == null ;
708746 }
709747
710748 /**
0 commit comments