This is a Python module currently built for python >= 2.7
On a Unix based system: (including Raspberry Pi, Mac OS X, Ubuntu)
\curl -sSL https://get.initialstate.com/python -o - | sudo bash
If you don't have curl you should get it
sudo apt-get install curl
sudo yum install curl
The package is hosted in PyPI under the package name ISStreamer.
-
(optional) Check if you have python setup tools installed:
$ easy_install --versionif you don't see a version number come back, you should install setuptools:
$ sudo apt-get install python-setuptools -
(optional) Check if you have pip installed:
$ pip --versionif you don't see a version number after running the above command, install pip using easy_install:
$ sudo easy_install pip
$ sudo pip install ISStreamer
This command installs the ISStreamer module
After getting the ISStreamer module, usage is really simple. With the following example, you can do most of what you need, and you don't need to read further! However
from ISStreamer.Streamer import Streamer
# create or append to a Streamer instance
streamer = Streamer(bucket_name="Some Bucket Name", bucket_key="bucket_key", access_key="YourAccessKey")
# send some data
streamer.log("test", "hi")
streamer.log("temperature", 32)
# flush data (force the buffer to empty and send)
streamer.flush()
# close the stream
streamer.close()-
In order to keep your event streams and visualizations contextually appropriate, we have implemented a concept called
buckets. A new bucket is automatically created when the Streamer is constructed, however, if you want to append to an existing bucket, or use a key that is more memorable than the uuid that will otherwise be used, you can use the optionalbucket_keyconstructor parameter. If a Streamer is constructed with abucket_keythat already exists, then any data sent in that Stream will append to the existing bucket.NOTE:
bucket_key's uniqueness is scoped to a specificaccess_key. -
An event stream is a key with an associated set of values with timestamps. These individual events are created every time the
Strmeaer.logmethod is called. If anevent_keyis the same for different pieces of data, those pieces of data are represented together in an event stream. This is more of an Initial State concept than a Python Streamer specific concept.Legacy Note: Event Streams use to be called Signals.
-
This is the core method and api for the event streamer. This is an asyncronous method that pushes your data to a queue that handles sending it off to Initial State's servers. You don't have to worry about anything but calling the method where you want! For the sake of clarity (for those new to python or programming) the
Streamerwould be replaced with the variable reference to aStreamerinstance.The
logmethod expects two parameters,keyandvalue:keyis a string that represents the source of thevalueand is limited to 250 charactersvalueis either a string, boolean, or number and it represents a value at the time of the method call.epochis an optional parameter to override the epoch timestamp, recommended for advanced uses.
-
This is an enhanced method to abstract having to write a bunch of log statements to stream all of the values of an object with multiple data points at a specific time.
The
log_objectmethod expects one parameter,obj:objis either a list, dict, or simple object with attributes.- If
objis a list, it will use the key namelist_nunless the optionalkey_prefixis supplied, then it will usekey_prefix_nwheren- in both cases - is an index. - If
objis a dict, it will use the key namedict_keywhere unless the optionalkey_prefixis supplied, then it will usekey_prefix_keywherekey- in both cases - is the key of the dictionary value. - If
objis a simple object, it will iterate over the objects attributes and produce values for keys with the name of the key asobj_attrunless thekey_prefixis supplied, then it will usekey_prefix_attr. In all cases,attris the attribute name.
- If
key_prefixis an optional string that, when supplied, overrrides the default key prefixes.epochis an optional number that represents the current time in epoch format.
NOTE: log_object will log multiple keys and values, but will override the epoch timestamp of each value so that there is no cpu or iteration skew in the timestamp reported for when those values were logged and streamed.
-
This method ensures that the log buffer is flushed and should be called when a program is exiting. It is also called during the
__del__magic method of theStreamerby python, but it is a best practice to explicitly call it at the end of a program to ensure it is executed.
-
You can manually flush on your own accord by calling
Streamer.flush(). This will ensure that anything that has been queued or buffered locally will get sent to Initial State's servers asap. -
You can override the default event buffer size (the count of events) by passing the optional
buffer_sizeparameter into the Streamer constructor. Here is an example:streamer = Streamer(bucket_name="Hi!", access_key="YourAccessKey", buffer_size=200)
In this example, the
buffer_sizeis being increased to 200 from the default, 10. The decision to override this value should be based on how many log statements you make in a loop before sleeping. You can typically play around with this number to help tune the performance of the Streame. Additionally, a higher buffer_size may help if getting throttled by the api.Another great example of overriding the buffer_size would be to increase to a large value and use the
Streamer.flush()method at the end of each iteration to effectively have a dynamic buffer size. Here is an example:... streamer = Streamer(bucket_name="Dynamic Buffer", access_key="YourAccessKey", buffer_size=200) counter = 0 while 1: streamer.log("loop_counter", counter) some_dict = { "a": 1, "b": 2, "c": 3 } streamer.log_object(some_dict, key_prefix="some_dict") dynamic_list = SomeOtherModule.PracticalClass.get_stuff() # We don't know how many items will be in dynamic_list, so we just # log the list to get them all. However, since we don't know how many # there will be, we don't know the optimal buffer size, so we set the # buffer to a high value and flush at the end of the iteration streamer.log_object(dynamic_list) # Here we will flush the log buffer to insure that there isn't a delay # in sending the logs that we've created previous to this point while # waiting for the sleep to finish streamer.flush() # increment counter counter += 1 # sleep for 10 seconds time.sleep(10) ...
-
Some have asked for the ability to override the timestamp. Currently, the timestamp is automatically associated with data by retrieving the most accurate timestamp possible from the device as soon as a
logorlog_objectmethod is called. However, you can override this by doing the following:time = time.time() streamer.log("siganl", 5, epoch=time)
For a full example checkout this
-
When you construct a
Streamerthe constructor expects a name or a key that it will use to ensure there is a bucket that it will use as the context forStreamer.log(key, value). Buckets are either created or consumed based on the unique combination of aaccess_keyand abucket_key. If you want to switch which to a new bucket, because say you've started a new session or run, simply callStreamer.set_bucket(bucket_name='some_bucket_name'[, bucket_key='some_bucket_key']). Note that bucket_key is optional, if not provided the module will create auuid4as thebucket_key. Here is an example:streamer = Streamer(bucket_name="Starting Bucket", access_key="YourAccessKey") streamer.log("key1", "starting") streamer.set_bucket(bucket_name="New Bucket") streamer.log("key1", "starting")
In this example, you will get a key1=starting in two different buckets: "Starting Bucket" and "New Bucket".
If the Streamer cannot ship a set of events during a flush, it will retry a few times before deeming it a failure. If it does fail, it will attempt to save it's payload to a local file. This payload will be in a json format inside a json array. Each array can be individually submitted to Initial State's events api to fill in any missed events.
If you're having issues with your data you might want to try running ISStreamer at a higher debug level:
logger = Streamer(bucket_name="SomeBucketName", access_key="YourAccessKey", debug_level=2)With a debug_level at or greater than 2 the streamer will throw exceptions on logging errors. Otherwise, it will assume logging errors are not fundamentally exceptional. It will also display more verbouse logging information.
If you want to simply see what's being logged in the console, construct the Streamer object with the debug_level equal to 1.