ptera.probe

This module defines the probing functionality. The interface for probes is built on giving

ptera.probe.probing(*selectors, raw=False, probe_type=None, env=None, overridable=False)

Probe that can be used as a context manager.

Example:

>>> def f(x):
...     a = x * x
...     return a
>>> with probing("f > a").print():
...     f(4)  # Prints {"a": 16}
Parameters
  • selectors – The selector strings describing the variables to probe (at least one).

  • raw – Defaults to False. If True, produce a stream of Capture objects that contain extra information about the capture.

  • probe_type

    Either “immediate”, “total”, or None (the default).

    • If “immediate”, use Immediate.

    • If “total”, use Total.

    • If None, determine what to use based on whether the selector has a focus or not.

  • env – A dictionary that will be used to resolve symbols in the selector. If it is not provided, ptera will seek the locals and globals dictionaries of the scope where this function is called.

  • overridable – Whether to include the override/koverride methods.

ptera.probe.global_probe(*selectors, raw=False, probe_type=None, env=None)

Set a probe globally.

Example:

>>> def f(x):
...     a = x * x
...     return a
>>> probe = global_probe("f > a")
>>> probe["a"].print()
>>> f(4)  # Prints 16
Parameters
  • selectors – The selector strings describing the variables to probe (at least one).

  • raw – Defaults to False. If True, produce a stream of Capture objects that contain extra information about the capture.

  • probe_type

    Either “immediate”, “total”, or None (the default).

    • If “immediate”, use Immediate.

    • If “total”, use Total.

    • If None, determine what to use based on whether the selector has a focus or not.

  • env – A dictionary that will be used to resolve symbols in the selector. If it is not provided, ptera will seek the locals and globals dictionaries of the scope where this function is called.

class ptera.probe.Probe(*selectors, raw=False)

Observable which generates a stream of values from program variables.

Probes should be created with probing() or global_probe().

Note

In the documentation for some methods you may see calls to give() or given(), but that’s because they come from the documentation for the giving package (on top of which Probe is built).

give() is equivalent to what Ptera does when a variable of interest is set, given() yields an object that has the same interface as Probe (the superclass to Probe, in fact). Take variables named gv to be probes.

Parameters
  • selectors – The selector strings describing the variables to probe (at least one).

  • raw – Defaults to False. If True, produce a stream of Capture objects that contain extra information about the capture. Mostly relevant for advanced selectors such as f > $x:@Parameter which captures the value of any variable with the Parameter tag under the generic name “x”. When raw is True, the actual name of the variable is preserved in a Capture object associated to x.

  • probe_type

    Either “immediate”, “total”, or None (the default).

    • If “immediate”, use Immediate.

    • If “total”, use Total.

    • If None, determine what to use based on whether the selector has a focus or not.

  • env – A dictionary that will be used to resolve symbols in the selector. If it is not provided, ptera will seek the locals and globals dictionaries of the scope where this function is called.

accum(obj=None)

Accumulate into a list or set.

Parameters

obj – The object in which to accumulate, either a list or a set. If not provided, a new list is created.

Returns

The object in which the values will be accumulated.

allow_empty()

Suppresses SequenceContainsNoElementsError.

This can be chained to reduce(), min(), first(), etc. of an empty sequence to allow the output of these operations to be empty. Otherwise, these operations would raise rx.internal.exceptions.SequenceContainsNoElementsError.

breakpoint(*args, skip=[], **kwargs)

Trigger a breakpoint on every entry.

Parameters

skip – A list of globs corresponding to modules to skip during debugging, for example skip=["giving.*"] would skip all frames that are in the giving module.

breakword(*args, skip=[], **kwargs)

Trigger a breakpoint using breakword.

This feature requires the breakword package to be installed, and the tag() operator to be applied.

gvt = gv.tag()
gvt.display()
gvt.breakword()

The above will display words next to each entry. Set the BREAKWORD environment to one of these words to set a breakpoint when it is printed.

Parameters
  • skip – A list of globs corresponding to modules to skip during debugging, for example skip=["giving.*"] would skip all frames that are in the giving module.

  • word – Only trigger the breakpoint on the given word.

display(*, breakword=None, skip=[], **kwargs)

Pretty-print each element.

Parameters
  • colors – Whether to colorize the output or not.

  • time_format – How to format the time (if present), e.g. "%Y-%m-%d %H:%M:%S"

  • breakword – If not None, run self.breakword(word=breakword).

  • skip – If breakword is not None, pass skip to the debugger.

eval(fn, *args, **kwargs)

Run a function in the context of this Given and get the values.

def main():
    give(x=1)
    give(x=2)

values = given()["x"].eval(main)
assert values == [1, 2]
Parameters
  • fn – The function to run.

  • args – Positional arguments to pass to fn.

  • kwargs – Keyword arguments to pass to fn.

exec(fn, *args, **kwargs)

Run a function in the context of this Given.

def main():
    give(x=1)
    give(x=2)

gv = given()
gv["x"].print()
gv.exec(main)  # prints 1, 2
Parameters
  • fn – The function to run.

  • args – Positional arguments to pass to fn.

  • kwargs – Keyword arguments to pass to fn.

fail(*args, skip=[], **kwargs)

Raise an exception if the stream produces anything.

Parameters
  • message – The exception message (format).

  • exc_type – The exception type to raise. Will be passed the next data element, and the result is raised. Defaults to Failure.

  • skip – Modules to skip in the traceback. Defaults to “giving.*” and “rx.*”.

fail_if_empty(message=None, exc_type=<class 'giving.gvn.Failure'>, skip=['giving.*', 'rx.*'])

Raise an exception if the stream is empty.

Parameters

exc_type – The exception type to raise. Defaults to Failure.

fail_if_false(*args, skip=[], **kwargs)

Raise an exception if any element of the stream is falsey.

False, 0, [], etc. are falsey.

Parameters

exc_type – The exception type to raise. Defaults to Failure.

give(*keys, **extra)

Give each element.

This calls give() for each value in the stream.

Be careful using this method because it could easily lead to an infinite loop.

Parameters
  • keys – Key(s) under which to give the elements.

  • extra – Extra key/value pairs to give along with the rest.

ksubscribe(fn)

Subscribe a function called with keyword arguments.

Note

The function passed to ksubscribe is wrapped with lax_function(), so it is not necessary to add a **kwargs argument for keys that you do not need.

gv.ksubscribe(lambda x, y=None, z=None: print(x, y, z))
give(x=1, z=2, abc=3)  # Prints 1, None, 2
Parameters

fn – The function to call.

pipe(*ops)

Pipe one or more operators.

Returns: An ObservableProxy.

print(format=None, skip_missing=False)

Print each element of the stream.

Parameters
  • format – A format string as would be used with str.format.

  • skip_missing – Whether to ignore KeyErrors due to missing entries in the format.

subscribe(observer=None, on_next=None, on_error=None, on_completed=None)

Subscribe a function to this Observable stream.

with given() as gv:
    gv.subscribe(print)

    results = []
    gv["x"].subscribe(results.append)

    give(x=1)  # prints {"x": 1}
    give(x=2)  # prints {"x": 2}

    assert results == [1, 2]
Parameters
  • observer – The object that is to receive notifications.

  • on_error – Action to invoke upon exceptional termination of the observable sequence.

  • on_completed – Action to invoke upon graceful termination of the observable sequence.

  • on_next – Action to invoke for each element in the observable sequence.

Returns

An object representing the subscription with a dispose() method to remove it.

values()

Context manager to accumulate the stream into a list.

with given()["?x"].values() as results:
    give(x=1)
    give(x=2)

assert results == [1, 2]

Note that this will activate the root given() including all subscriptions that it has (directly or indirectly).

wrap(name, fn=None, pass_keys=False, return_function=False)

Subscribe a context manager, corresponding to wrap().

@gv.wrap("main")
def _():
    print("<")
    yield
    print(">")

with give.wrap("main"):    # prints <
    ...
    with give.wrap("sub"):
        ...
    ...
...                        # prints >
Parameters
  • name – The name of the wrap block to subscribe to.

  • fn – The wrapper function OR an object with an __enter__ method. If the wrapper is a generator, it will be wrapped with contextmanager(fn). If a function, it will be called with no arguments, or with the arguments given to give.wrap if pass_keys=True.

  • pass_keys – Whether to pass the arguments to give.wrap to this function as keyword arguments. You may use kwrap() as a shortcut to pass_keys=True.

__or__(other)

Alias for merge().

Merge this ObservableProxy with another.

__rshift__(subscription)

Alias for subscribe().

If subscription is a list or a set, accumulate into it.

__getitem__(item)

Mostly an alias for getitem().

Extra feature: if the item starts with "?", getitem is called with strict=False.

Other methods: All operators in the operator list have a corresponding method on Probe.

class ptera.probe.OverridableProbe(*selectors, raw=False)

Probe that allows overriding the values of variables.

OverridableProbe works essentially like Probe, but it also has the override() and koverride() methods, which can be used to inject different values in the probed variables.

OverridableProbe is triggered before Probe, so a Probe will see the changes of an OverridableProbe. However, if there are multiple operations on an OverridableProbe, these operations will proceed using the old, non-overriden values.

koverride(setter)

Override the value of the focus variable using a setter function with kwargs.

def f(x):
    ...
    y = 123
    ...

# This will basically override y = 123 to become y = x + 123
OverridableProbe("f(x) > y").koverride(lambda x, y: x + y)

Note

Important: override() only overrides the focus variable. The focus variable is the one to the right of >, or the one prefixed with !.

See override().

Parameters

setter – A function that takes a value from the pipeline as keyword arguments and produces the value to set the focus variable to.

override(setter=<function _identity>)

Override the value of the focus variable using a setter function.

# Increment a whenever it is set (will not apply recursively)
OverridableProbe("f > a")["a"].override(lambda value: value + 1)

Note

Important: override() only overrides the focus variable. The focus variable is the one to the right of >, or the one prefixed with !.

This is because a Ptera selector is triggered when the focus variable is set, so realistically it is the only one that it makes sense to override.

Be careful, because it is easy to write misleading code:

# THIS WILL SET y = x + 1, NOT x
OverridableProbe("f(x) > y")["x"].override(lambda x: x + 1)

Note

override will only work at the end of a synchronous pipe (map/filter are OK, but not e.g. sample)

Parameters

setter

A function that takes a value from the pipeline and produces the value to set the focus variable to.

  • If not provided, the value from the stream is used as-is.

  • If not callable, set the variable to the value of setter