import multiprocessing
def _parallel_compute_function(x):
return (x[0])(*(x[1]),**(x[2]))(x[3],x[4])
class PipeSegment:
def __init__(self):
self.feeder = None
self.procout = None
self.procstart = False
self.procfinish = False
self._cited = 0
self._used = 0
self._saveall = 0
self._verbose = 0
def __call__(self, saveall=0, verbose=0):
self._saveall = saveall
self._verbose = verbose
if self.procstart and not self.procfinish:
raise Exception('(!) Circular dependency in workflow.')
if not self.procfinish:
self.procstart = True
self.procout = self.process()
self.procfinish = True
return self.procout
def process(self):
pin = self.feeder(self._saveall, self._verbose)
self.feeder._used += 1
if self._saveall == 0 and self.feeder._used == self.feeder._cited:
self.feeder.reset(recursive=False)
if self._verbose > 0:
self.printout(self._verbose, pin)
return self.transform(pin)
def transform(self, pin):
return pin
def reset(self, recursive=True):
self.procout = None
self.procstart = False
self.procfinish = False
if recursive:
self.feeder.reset(recursive=True)
def printout(self, verbose, *args):
if verbose >= 1:
print(type(self))
if verbose >= 2:
print(vars(self))
if verbose >= 3:
for x in args:
print(x)
if verbose >= 2:
print()
def selfstring(self, offset=0):
return ' '*2*offset + type(self).__name__ + '\n'
def __str__(self, offset=0):
return self.selfstring(offset) + self.feeder.__str__(offset+1)
def attach_check(self, ps):
if not self.attach(ps):
raise Exception('(!) ' + type(self).__name__
+ ' has no free input at which to attach '
+ type(ps).__name__ + '.')
def attach(self, ps):
if self.feeder is None:
self.feeder = ps
self.feeder._cited += 1
return True
else:
return self.feeder.attach(ps) or ps is self
def __mul__(self, other):
other.attach_check(self)
return other
def __or__(self, other):
other.attach_check(self)
return other
def __add__(self, other):
return MergeSegment(self, other)
def __rmul__(self, other):
return LoadSegment(other) * self
def __ror__(self, other):
return LoadSegment(other) * self
@classmethod
def parallel(cls, input_args=None, input_kwargs=None, processes=None,
saveall=0, verbose=0):
if input_args is not None and input_kwargs is None:
input_kwargs = [{}] * len(input_args)
elif input_kwargs is not None and input_args is None:
input_args = [[]] * len(input_kwargs)
elif input_args is None and input_kwargs is None:
input_args = [[]]
input_kwargs = [{}]
all_inputs = list(zip([cls]*len(input_args), input_args, input_kwargs,
[saveall]*len(input_args),
[verbose]*len(input_args)))
#with multiprocessing.get_context('spawn').Pool(processes) as pool:
with multiprocessing.Pool(processes) as pool:
return pool.map(_parallel_compute_function, all_inputs)
class LoadSegment(PipeSegment):
def __init__(self, source=None):
super().__init__()
self.source = source
def process(self):
if self._verbose > 0:
self.printout(self._verbose)
return self.load()
def load(self):
return self.source
def reset(self, recursive=True):
self.procout = None
self.procstart = False
self.procfinish = False
def __str__(self, offset=0):
return self.selfstring(offset)
def attach(self, ps):
return ps is self
class MergeSegment(PipeSegment):
def __init__(self, feeder1, feeder2):
super().__init__()
self.feeder1 = feeder1
self.feeder1._cited += 1
self.feeder2 = feeder2
self.feeder2._cited += 1
def process(self):
p1 = self.feeder1(self._saveall, self._verbose)
p2 = self.feeder2(self._saveall, self._verbose)
self.feeder1._used += 1
if self._saveall == 0 and self.feeder1._used == self.feeder1._cited:
self.feeder1.reset(recursive=False)
self.feeder2._used += 1
if self._saveall == 0 and self.feeder2._used == self.feeder2._cited:
self.feeder2.reset(recursive=False)
if self._verbose > 0:
self.printout(self._verbose, p1, p2)
if not isinstance(p1, tuple):
p1 = (p1,)
if not isinstance(p2, tuple):
p2 = (p2,)
return p1 + p2
def reset(self, recursive=True):
self.procout = None
self.procstart = False
self.procfinish = False
if recursive:
self.feeder1.reset(recursive=True)
self.feeder2.reset(recursive=True)
def __str__(self, offset=0):
return self.selfstring(offset) \
+ self.feeder1.__str__(offset+1) \
+ self.feeder2.__str__(offset+1)
def attach(self, ps):
if self.feeder1 is None:
self.feeder1 = ps
self.feeder._cited += 1
flag1 = True
else:
flag1 = self.feeder1.attach(ps)
if self.feeder2 is None:
self.feeder2 = ps
self.feeder._cited += 1
flag2 = True
else:
flag2 = self.feeder2.attach(ps)
return flag1 or flag2 or ps is self
[docs]class SelectItem(PipeSegment):
"""
Given an iterable, return one of its items. This can be used to select
a single output from a class that returns a tuple of outputs.
"""
def __init__(self, index=0):
super().__init__()
self.index = index
def transform(self, pin):
return pin[self.index]
[docs]class Identity(PipeSegment):
"""
This class is an alias for the PipeSegment base class, to
emphasize its property of passing data through, unchanged.
Formally, this is the identity element for the '*' operation.
"""
pass
[docs]class ReturnEmpty(PipeSegment):
"""
Regardless of input, returns an empty tuple.
This can be useful in Map and Conditional classes.
Formally, this is the identity element for the '+' operation.
"""
def transform(self, pin):
return ()
[docs]class Conditional(PipeSegment):
"""
This is the pipesegment version of an if statement.
The piped input is fed into an object of the 'condition_class' class.
If 'True' is returned, then the input is fed through an 'if_class' object.
Otherwise, the input is fed through an 'else_class' object.
"""
def __init__(self, condition_class,
if_class=Identity, else_class=ReturnEmpty,
condition_args=[], if_args=[], else_args=[],
condition_kwargs={}, if_kwargs={}, else_kwargs={}):
super().__init__()
self.condition_class = condition_class
self.if_class = if_class
self.else_class = else_class
self.condition_args = condition_args
self.if_args = if_args
self.else_args = else_args
self.condition_kwargs = condition_kwargs
self.if_kwargs = if_kwargs
self.else_kwargs = else_kwargs
if issubclass(self.condition_class, LoadSegment) \
and issubclass(self.if_class, LoadSegment) \
and issubclass(self.else_class, LoadSegment):
self.feeder = LoadSegment(())
def transform(self, pin):
condition_obj = self.condition_class(*self.condition_args,
**self.condition_kwargs)
if not isinstance(condition_obj, LoadSegment):
condition_obj = LoadSegment(pin) * condition_obj
if condition_obj(self._saveall, self._verbose):
inner_obj = self.if_class(*self.if_args, **self.if_kwargs)
else:
inner_obj = self.else_class(*self.else_args, **self.else_kwargs)
if not isinstance(inner_obj, LoadSegment):
inner_obj = LoadSegment(pin) * inner_obj
return inner_obj(self._saveall, self._verbose)
[docs]class Map(PipeSegment):
"""
This is the pipesegment version of a for-loop.
Given an iterable of inputs, applies the PipeSegment-derived class
specified by 'inner_class' to each one, then returns all the results
as a tuple.
"""
def __init__(self, inner_class, *args, **kwargs):
super().__init__()
self.inner_class = inner_class
self.args = args
self.kwargs = kwargs
def transform(self, pin):
pout = ()
for entry in pin:
outp = (LoadSegment(entry) * self.inner_class(*self.args,
**self.kwargs))(self._saveall, self._verbose)
if not isinstance(outp, tuple):
outp = (outp,)
pout = pout + outp
return pout
[docs]class While(PipeSegment):
"""
This is the pipesegment version of a while-loop.
Applies the the PipeSegment-derived class specified by 'inner_class'
to the piped input over and over again, until sending the piped input
through an object of class 'condition_class' returns false.
"""
def __init__(self, condition_class, inner_class,
condition_args=[], inner_args=[],
condition_kwargs={}, inner_kwargs={}):
super().__init__()
self.condition_class = condition_class
self.inner_class = inner_class
self.condition_args = condition_args
self.inner_args = inner_args
self.condition_kwargs = condition_kwargs
self.inner_kwargs = inner_kwargs
def transform(self, pin):
condition_obj = self.condition_class(*self.condition_args,
**self.condition_kwargs)
while (LoadSegment(pin) * condition_obj)(self._saveall, self._verbose):
inner_obj = self.inner_class(*self.inner_args,
**self.inner_kwargs)
pin = (LoadSegment(pin) * inner_obj)(self._saveall, self._verbose)
condition_obj = self.condition_class(*self.condition_args,
**self.condition_kwargs)
return pin
[docs]class PipeArgs(PipeSegment):
"""
Wrapper for any PipeSegment subclass which enables it to accept
initialization arguments from piped input.
"""
def __init__(self, inner_class, *args, **kwargs):
super().__init__()
self.inner_class = inner_class
self.args = args
self.kwargs = kwargs
def transform(self, pin):
if issubclass(self.inner_class, LoadSegment):
isloadsegment = True
argstart = 0
else:
isloadsegment = False
argstart = 1
inner_pin = pin[0]
# Gather all initialization arguments
args = self.args
kwargs = self.kwargs.copy()
pargs = (pin if isinstance(pin, tuple) else (pin,))[argstart:]
for p in pargs:
if isinstance(p, dict):
kwargs.update(p)
else:
args = args + (p,)
#Initialize and call object
obj = self.inner_class(*args, **kwargs)
if isloadsegment:
return obj(self._saveall, self._verbose)
else:
return (LoadSegment(inner_pin) * obj)(self._saveall, self._verbose)
[docs]class FunctionPipe(PipeSegment):
"""
Turns a user-supplied function into a PipeSegment
"""
def __init__(self, function):
super().__init__()
self.function = function
def transform(self, pin):
return self.function(pin)
[docs]def PipeFunction(inner_class=PipeSegment, pin=(), *args, **kwargs):
"""
Turns a PipeSegment into a standalone function.
inner_class is the PipeSegment class, pin is the input to pipe into it,
and *args and **kwargs are sent to the PipeSegment's constructor.
"""
psobject = inner_class(*args, **kwargs)
if issubclass(self.inner_class, LoadSegment):
return psobject()
else:
return (pin * psobject)()