@@ -41,6 +41,7 @@ class Watch implements Executable
4141 const FULL_DOCUMENT_DEFAULT = 'default ' ;
4242 const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup ' ;
4343
44+ private $ aggregate ;
4445 private $ databaseName ;
4546 private $ collectionName ;
4647 private $ pipeline ;
@@ -96,24 +97,8 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
9697 'readPreference ' => new ReadPreference (ReadPreference::RP_PRIMARY ),
9798 ];
9899
99- if (isset ($ options ['batchSize ' ]) && ! is_integer ($ options ['batchSize ' ])) {
100- throw InvalidArgumentException::invalidType ('"batchSize" option ' , $ options ['batchSize ' ], 'integer ' );
101- }
102-
103- if (isset ($ options ['collation ' ]) && ! is_array ($ options ['collation ' ]) && ! is_object ($ options ['collation ' ])) {
104- throw InvalidArgumentException::invalidType ('"collation" option ' , $ options ['collation ' ], 'array or object ' );
105- }
106-
107- if (isset ($ options ['maxAwaitTimeMS ' ]) && ! is_integer ($ options ['maxAwaitTimeMS ' ])) {
108- throw InvalidArgumentException::invalidType ('"maxAwaitTimeMS" option ' , $ options ['maxAwaitTimeMS ' ], 'integer ' );
109- }
110-
111- if (isset ($ options ['readConcern ' ]) && ! $ options ['readConcern ' ] instanceof ReadConcern) {
112- throw InvalidArgumentException::invalidType ('"readConcern" option ' , $ options ['readConcern ' ], 'MongoDB\Driver\ReadConcern ' );
113- }
114-
115- if (isset ($ options ['readPreference ' ]) && ! $ options ['readPreference ' ] instanceof ReadPreference) {
116- throw InvalidArgumentException::invalidType ('"readPreference" option ' , $ options ['readPreference ' ], 'MongoDB\Driver\ReadPreference ' );
100+ if (isset ($ options ['fullDocument ' ]) && ! is_string ($ options ['fullDocument ' ])) {
101+ throw InvalidArgumentException::invalidType ('"fullDocument" option ' , $ options ['fullDocument ' ], 'string ' );
117102 }
118103
119104 if (isset ($ options ['resumeAfter ' ])) {
@@ -127,6 +112,8 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
127112 $ this ->collectionName = (string ) $ collectionName ;
128113 $ this ->pipeline = $ pipeline ;
129114 $ this ->options = $ options ;
115+
116+ $ this ->aggregate = $ this ->createAggregate ();
130117 }
131118
132119 /**
@@ -141,57 +128,46 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
141128 */
142129 public function execute (Server $ server )
143130 {
144- $ command = $ this ->createCommand ();
145-
146- $ cursor = $ command ->execute ($ server );
131+ $ cursor = $ this ->aggregate ->execute ($ server );
147132
148133 return new ChangeStream ($ cursor , $ this ->createResumeCallable ());
149134 }
150135
151- private function createAggregateOptions ()
152- {
153- $ aggOptions = array_intersect_key ($ this ->options , ['batchSize ' => 1 , 'collation ' => 1 , 'maxAwaitTimeMS ' => 1 ]);
154- if ( ! $ aggOptions ) {
155- return [];
156- }
157- return $ aggOptions ;
158- }
159-
160- private function createChangeStreamOptions ()
161- {
162- $ csOptions = array_intersect_key ($ this ->options , ['fullDocument ' => 1 , 'resumeAfter ' => 1 ]);
163- if ( ! $ csOptions ) {
164- return [];
165- }
166- return $ csOptions ;
167- }
168-
169136 /**
170- * Create the aggregate pipeline with the changeStream command .
137+ * Create the aggregate command for creating a change stream .
171138 *
172- * @return Command
139+ * This method is also used to recreate the aggregate command if a new
140+ * resume token is provided while resuming.
141+ *
142+ * @return Aggregate
173143 */
174- private function createCommand ()
144+ private function createAggregate ()
175145 {
176- $ changeStreamArray = ['$changeStream ' => $ this ->createChangeStreamOptions ()];
177- array_unshift ($ this ->pipeline , $ changeStreamArray );
146+ $ changeStreamOptions = array_intersect_key ($ this ->options , ['fullDocument ' => 1 , 'resumeAfter ' => 1 ]);
147+ $ changeStream = ['$changeStream ' => (object ) $ changeStreamOptions ];
148+
149+ $ pipeline = $ this ->pipeline ;
150+ array_unshift ($ pipeline , $ changeStream );
178151
179- $ cmd = new Aggregate ($ this ->databaseName , $ this -> collectionName , $ this -> pipeline , $ this -> createAggregateOptions () );
152+ $ aggregateOptions = array_intersect_key ($ this ->options , [ ' batchSize ' => 1 , ' collation ' => 1 , ' maxAwaitTimeMS ' => 1 , ' readConcern ' => 1 , ' readPreference ' => 1 ] );
180153
181- return $ cmd ;
154+ return new Aggregate ( $ this -> databaseName , $ this -> collectionName , $ pipeline , $ aggregateOptions ) ;
182155 }
183156
184157 private function createResumeCallable ()
185158 {
186- array_shift ($ this ->pipeline );
187159 return function ($ resumeToken = null ) {
188- // Select a server from manager using read preference option
189- $ server = $ this ->manager ->selectServer ($ this ->options ['readPreference ' ]);
190- // Update $this->options['resumeAfter'] from $resumeToken arg
160+ /* If a resume token was provided, recreate the Aggregate operation
161+ * using the new resume token. */
191162 if ($ resumeToken !== null ) {
192163 $ this ->options ['resumeAfter ' ] = $ resumeToken ;
164+ $ this ->aggregate = $ this ->createAggregate ();
193165 }
194- // Return $this->execute() with the newly selected server
166+
167+ /* Select a new server using the read preference, execute this
168+ * operation on it, and return the new ChangeStream. */
169+ $ server = $ this ->manager ->selectServer ($ this ->options ['readPreference ' ]);
170+
195171 return $ this ->execute ($ server );
196172 };
197173 }
0 commit comments