# Copyright 2011-2014 Splunk, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"): you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from __future__ import absolute_import from . search_command import SearchCommand from . import csv class StreamingCommand(SearchCommand): """ Applies a transformation to search results as they travel through the processing pipeline. Streaming commands typically filter, sort, modify, or combine search results. Splunk will send search results in batches of up to 50,000 records. Hence, a search command must be prepared to be invoked many times during the course of pipeline processing. Each invocation should produce a set of results independently usable by downstream processors. By default Splunk may choose to run a streaming command locally on a search head and/or remotely on one or more indexers concurrently. The size and frequency of the search result batches sent to the command will vary based on scheduling considerations. Streaming commands are typically invoked many times during the course of pipeline processing. You can tell Splunk to run your streaming command locally on a search head, never remotely on indexers. .. code-block:: python @Configuration(local=False) class SomeStreamingCommand(StreamingCommand): ... If your streaming command modifies the time order of event records you must tell Splunk to ensure correct behavior. .. code-block:: python @Configuration(overrides_timeorder=True) class SomeStreamingCommand(StreamingCommand): ... :ivar input_header: :class:`InputHeader`: Collection representing the input header associated with this command invocation. :ivar messages: :class:`MessagesHeader`: Collection representing the output messages header associated with this command invocation. """ #region Methods def stream(self, records): """ Generator function that processes and yields event records to the Splunk processing pipeline. You must override this method. """ raise NotImplementedError('StreamingCommand.stream(self, records)') def _execute(self, operation, reader, writer): for record in operation(SearchCommand.records(reader)): writer.writerow(record) def _prepare(self, argv, input_file): ConfigurationSettings = type(self).ConfigurationSettings argv = argv[2:] if input_file is None: reader = None else: reader = csv.DictReader(input_file) return ConfigurationSettings, self.stream, argv, reader #endregion class ConfigurationSettings(SearchCommand.ConfigurationSettings): """ Represents the configuration settings that apply to a :code:`StreamingCommand`. """ #region Properties @property def local(self): """ Specifies whether this command should only be run on the search head. Default: :const:`False` """ return type(self)._local _local = False @property def overrides_timeorder(self): """ Specifies whether this command changes the time ordering of event records. Default: :const:`False` """ return type(self)._overrides_timeorder _overrides_timeorder = False @property def retainsevents(self): """ Specifies whether this command retains _raw events or transforms them. Default: :const:`True` """ return type(self)._retainsevents _retainsevents = True @property def streaming(self): """ Signals that this command is streamable. By default streamable commands may be run on the search head or one or more indexers, depending on performance scheduling considerations. This behavior may be overridden by setting :code:`local=True`. This forces a streamable command to be run on the search head. Fixed: True. """ return True #endregion #region Methods @classmethod def fix_up(cls, command): """ Verifies :code:`command` class structure. """ if command.stream == StreamingCommand.stream: raise AttributeError('No StreamingCommand.stream override') return #endregion