1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
#![allow(dead_code)]

//! The `ParallelIterator` module makes it easy to write parallel
//! programs using an iterator-style interface. To get access to all
//! the methods you want, the easiest is to write `use
//! rayon::prelude::*;` at the top of your module, which will import
//! the various traits and methods you need.
//!
//! The submodules of this module mostly just contain implementaton
//! details of little interest to an end-user. If you'd like to read
//! the code itself, the `internal` module and `README.md` file are a
//! good place to start.

use std::f64;
use std::ops::Fn;
use self::collect::collect_into;
use self::enumerate::Enumerate;
use self::filter::Filter;
use self::filter_map::FilterMap;
use self::flat_map::FlatMap;
use self::map::Map;
use self::reduce::{reduce, ReduceOp, SumOp, MulOp, MinOp, MaxOp, ReduceWithOp,
                   ReduceWithIdentityOp, SUM, MUL, MIN, MAX};
use self::internal::*;
use self::weight::Weight;
use self::zip::ZipIter;

pub mod collect;
pub mod enumerate;
pub mod filter;
pub mod filter_map;
pub mod flat_map;
pub mod internal;
pub mod len;
pub mod for_each;
#[cfg(feature = "unstable")]
pub mod fold;
pub mod reduce;
pub mod slice;
pub mod slice_mut;
pub mod map;
pub mod weight;
pub mod zip;
pub mod range;
pub mod vec;

#[cfg(test)]
mod test;

pub trait IntoParallelIterator {
    type Iter: ParallelIterator<Item=Self::Item>;
    type Item: Send;

    fn into_par_iter(self) -> Self::Iter;
}

pub trait IntoParallelRefIterator<'data> {
    type Iter: ParallelIterator<Item=&'data Self::Item>;
    type Item: Sync + 'data;

    fn par_iter(&'data self) -> Self::Iter;
}

pub trait IntoParallelRefMutIterator<'data> {
    type Iter: ParallelIterator<Item=&'data mut Self::Item>;
    type Item: Send + 'data;

    fn par_iter_mut(&'data mut self) -> Self::Iter;
}

/// The `ParallelIterator` interface.
pub trait ParallelIterator: Sized {
    type Item: Send;

    /// Indicates the relative "weight" of producing each item in this
    /// parallel iterator. A higher weight will cause finer-grained
    /// parallel subtasks. 1.0 indicates something very cheap and
    /// uniform, like copying a value out of an array, or computing `x
    /// + 1`. If your tasks are either very expensive, or very
    /// unpredictable, you are better off with higher values. See also
    /// `weight_max`, which is a convenient shorthand to force the
    /// finest grained parallel execution posible. Tuning this value
    /// should not affect correctness but can improve (or hurt)
    /// performance.
    fn weight(self, scale: f64) -> Weight<Self> {
        Weight::new(self, scale)
    }

    /// Shorthand for `self.weight(f64::INFINITY)`. This forces the
    /// smallest granularity of parallel execution, which makes sense
    /// when your parallel tasks are (potentially) very expensive to
    /// execute.
    fn weight_max(self) -> Weight<Self> {
        self.weight(f64::INFINITY)
    }

    /// Executes `OP` on each item produced by the iterator, in parallel.
    fn for_each<OP>(self, op: OP)
        where OP: Fn(Self::Item) + Sync
    {
        for_each::for_each(self, &op)
    }

    /// Applies `map_op` to each item of his iterator, producing a new
    /// iterator with the results.
    fn map<MAP_OP,R>(self, map_op: MAP_OP) -> Map<Self, MAP_OP>
        where MAP_OP: Fn(Self::Item) -> R
    {
        Map::new(self, map_op)
    }

    /// Applies `map_op` to each item of his iterator, producing a new
    /// iterator with the results.
    fn filter<FILTER_OP>(self, filter_op: FILTER_OP) -> Filter<Self, FILTER_OP>
        where FILTER_OP: Fn(&Self::Item) -> bool
    {
        Filter::new(self, filter_op)
    }

    /// Applies `map_op` to each item of his iterator, producing a new
    /// iterator with the results.
    fn filter_map<FILTER_OP,R>(self, filter_op: FILTER_OP) -> FilterMap<Self, FILTER_OP>
        where FILTER_OP: Fn(Self::Item) -> Option<R>
    {
        FilterMap::new(self, filter_op)
    }

    /// Applies `map_op` to each item of his iterator, producing a new
    /// iterator with the results.
    fn flat_map<MAP_OP,PI>(self, map_op: MAP_OP) -> FlatMap<Self, MAP_OP>
        where MAP_OP: Fn(Self::Item) -> PI, PI: ParallelIterator
    {
        FlatMap::new(self, map_op)
    }

    /// Reduces the items in the iterator into one item using `op`.
    /// See also `sum`, `mul`, `min`, etc, which are slightly more
    /// efficient. Returns `None` if the iterator is empty.
    ///
    /// Note: unlike in a sequential iterator, the order in which `op`
    /// will be applied to reduce the result is not specified. So `op`
    /// should be commutative and associative or else the results will
    /// be non-deterministic.
    fn reduce_with<OP>(self, op: OP) -> Option<Self::Item>
        where OP: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
    {
        reduce(self.map(Some), &ReduceWithOp::new(&op))
    }

    /// Reduces the items in the iterator into one item using `op`.
    /// The argument `identity` represents an "identity" value which
    /// may be inserted into the sequence as needed to create
    /// opportunities for parallel execution. So, for example, if you
    /// are doing a summation, then `identity` ought to be something
    /// that represents the zero for your type (but consider just
    /// calling `sum()` in that case).
    ///
    /// Example `vectors.par_iter().reduce_with_identity(Vector::zero(), Vector::add)`.
    ///
    /// Note: unlike in a sequential iterator, the order in which `op`
    /// will be applied to reduce the result is not specified. So `op`
    /// should be commutative and associative or else the results will
    /// be non-deterministic. And of course `identity` should be a
    /// true identity.
    fn reduce_with_identity<OP>(self, identity: Self::Item, op: OP) -> Self::Item
        where OP: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
              Self::Item: Clone + Sync,
    {
        reduce(self, &ReduceWithIdentityOp::new(&identity, &op))
    }

    /// A variant on the typical `map/reduce` pattern. Parallel fold
    /// is similar to sequential fold except that the sequence of
    /// items may be subdivided before it is folded. The resulting
    /// values are then reduced together using `reduce_op`.  Typically
    /// `fold_op` and `reduce_op` will be doing the same conceptual
    /// operation, but on different types, or with a different twist.
    ///
    /// Here is how to visualize what is happening. Imagine an input
    /// sequence with 7 values as shown:
    ///
    /// ```
    /// [ 0 1 2 3 4 5 6 ]
    ///   |     | |   |
    ///   +--X--+ +-Y-+ // <-- fold_op
    ///      |      |
    ///      +---Z--+   // <-- reduce_op
    /// ```
    ///
    /// These values will be first divided into contiguous chunks of
    /// some size (the precise sizes will depend on how many cores are
    /// present and how active they are). These are folded using
    /// `fold_op`. Here, the chunk `[0, 1, 2, 3]` was folded into `X`
    /// and the chunk `[4, 5, 6]` was folded into `Y`. Note that `X`
    /// and `Y` may, in general, have different types than the
    /// original input sequence. Now the results from these folds are
    /// themselves *reduced* using `reduce_op` (again, in some
    /// unspecified order). So now `X` and `Y` are reduced to `Z`,
    /// which is the final result. Note that `reduce_op` must consume
    /// and produce values of the same type.
    ///
    /// Note that `fold` can always be expressed using map/reduce. For
    /// example, a call `self.fold(identity, fold_op, reduce_op)` could
    /// also be expressed as follows:
    ///
    /// ```
    /// self.map(|elem| fold_op(identity.clone(), elem))
    ///     .reduce_with_identity(identity, reduce_op)
    /// ```
    ///
    /// This is equivalent to an execution of `fold` where the
    /// subsequences that were folded sequentially would up being of
    /// length 1.  However, this would rarely happen in practice,
    /// typically the subsequences would be larger, and hence a call
    /// to `fold` *can* be more efficient than map/reduce,
    /// particularly if the `fold_op` is more efficient when applied
    /// to a large sequence.
    ///
    /// **This method is marked as unstable** because it is
    /// particularly likely to change its name and/or signature, or go
    /// away entirely.
    #[cfg(feature = "unstable")]
    fn fold<I,FOLD_OP,REDUCE_OP>(self,
                                 identity: I,
                                 fold_op: FOLD_OP,
                                 reduce_op: REDUCE_OP)
                                 -> I
        where FOLD_OP: Fn(I, Self::Item) -> I + Sync,
              REDUCE_OP: Fn(I, I) -> I + Sync,
              I: Clone + Sync + Send,
    {
        fold::fold(self, &identity, &fold_op, &reduce_op)
    }

    /// Sums up the items in the iterator.
    ///
    /// Note that the order in items will be reduced is not specified,
    /// so if the `+` operator is not truly commutative and
    /// associative (as is the case for floating point numbers), then
    /// the results are not fully deterministic.
    fn sum(self) -> Self::Item
        where SumOp: ReduceOp<Self::Item>
    {
        reduce(self, SUM)
    }

    /// Multiplies all the items in the iterator.
    ///
    /// Note that the order in items will be reduced is not specified,
    /// so if the `*` operator is not truly commutative and
    /// associative (as is the case for floating point numbers), then
    /// the results are not fully deterministic.
    fn mul(self) -> Self::Item
        where MulOp: ReduceOp<Self::Item>
    {
        reduce(self, MUL)
    }

    /// Computes the minimum of all the items in the iterator.
    ///
    /// Note that the order in items will be reduced is not specified,
    /// so if the `Ord` impl is not truly commutative and associative
    /// (as is the case for floating point numbers), then the results
    /// are not deterministic.
    fn min(self) -> Self::Item
        where MinOp: ReduceOp<Self::Item>
    {
        reduce(self, MIN)
    }

    /// Computes the maximum of all the items in the iterator.
    ///
    /// Note that the order in items will be reduced is not specified,
    /// so if the `Ord` impl is not truly commutative and associative
    /// (as is the case for floating point numbers), then the results
    /// are not deterministic.
    fn max(self) -> Self::Item
        where MaxOp: ReduceOp<Self::Item>
    {
        reduce(self, MAX)
    }

    /// Reduces the items using the given "reduce operator". You may
    /// prefer `reduce_with` for a simpler interface.
    ///
    /// Note that the order in items will be reduced is not specified,
    /// so if the `reduce_op` impl is not truly commutative and
    /// associative, then the results are not deterministic.
    fn reduce<REDUCE_OP>(self, reduce_op: &REDUCE_OP) -> Self::Item
        where REDUCE_OP: ReduceOp<Self::Item>
    {
        reduce(self, reduce_op)
    }

    /// Internal method used to define the behavior of this parallel
    /// iterator. You should not need to call this directly.
    #[doc(hidden)]
    fn drive_unindexed<C>(self, consumer: C) -> C::Result
        where C: UnindexedConsumer<Self::Item>;
}

impl<T: ParallelIterator> IntoParallelIterator for T {
    type Iter = T;
    type Item = T::Item;

    fn into_par_iter(self) -> T {
        self
    }
}

/// A trait for parallel iterators items where the precise number of
/// items is not known, but we can at least give an upper-bound. These
/// sorts of iterators result from filtering.
pub trait BoundedParallelIterator: ParallelIterator {
    fn upper_bound(&mut self) -> usize;

    /// Internal method used to define the behavior of this parallel
    /// iterator. You should not need to call this directly.
    #[doc(hidden)]
    fn drive<'c, C: Consumer<Self::Item>>(self,
                                                   consumer: C)
                                                   -> C::Result;

}

/// A trait for parallel iterators items where the precise number of
/// items is known. This occurs when e.g. iterating over a
/// vector. Knowing precisely how many items will be produced is very
/// useful.
pub trait ExactParallelIterator: BoundedParallelIterator {
    /// Produces an exact count of how many items this iterator will
    /// produce, presuming no panic occurs.
    ///
    /// # Safety note
    ///
    /// Returning an incorrect value here could lead to **undefined
    /// behavior**.
    fn len(&mut self) -> usize;

    /// Collects the results of the iterator into the specified
    /// vector. The vector is always truncated before execution
    /// begins. If possible, reusing the vector across calls can lead
    /// to better performance since it reuses the same backing buffer.
    fn collect_into(self, target: &mut Vec<Self::Item>) {
        collect_into(self, target);
    }
}

/// An iterator that supports "random access" to its data, meaning
/// that you can split it at arbitrary indices and draw data from
/// those points.
pub trait IndexedParallelIterator: ExactParallelIterator {
    /// Internal method to convert this parallel iterator into a
    /// producer that can be used to request the items. Users of the
    /// API never need to know about this fn.
    #[doc(hidden)]
    fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output;

    /// Iterate over tuples `(A, B)`, where the items `A` are from
    /// this iterator and `B` are from the iterator given as argument.
    /// Like the `zip` method on ordinary iterators, if the two
    /// iterators are of unequal length, you only get the items they
    /// have in common.
    fn zip<ZIP_OP>(self, zip_op: ZIP_OP) -> ZipIter<Self, ZIP_OP::Iter>
        where ZIP_OP: IntoParallelIterator, ZIP_OP::Iter: IndexedParallelIterator
    {
        ZipIter::new(self, zip_op.into_par_iter())
    }

    /// Yields an index along with each item.
    fn enumerate(self) -> Enumerate<Self> {
        Enumerate::new(self)
    }
}