8686#
8787# ==== Configuring multiple SQL statements
8888#
89- # Configuring multiple SQL statements is useful when there is a need to query and ingest data
90- # from different database tables or views. It is possible to define separate Logstash
91- # configuration files for each statement or to define multiple statements in a single configuration
92- # file. When using multiple statements in a single Logstash configuration file, each statement
93- # has to be defined as a separate jdbc input (including jdbc driver, connection string and other
94- # required parameters).
89+ # Configuring multiple SQL statements is useful when there is a need to query and ingest data
90+ # from different database tables or views. It is possible to define separate Logstash
91+ # configuration files for each statement, to define multiple statements in a single configuration
92+ # file or to use the `statements_directory` parameter where all statements within the configured
93+ # directory get executed. When using multiple statements in a single Logstash configuration file,
94+ # each statement has to be defined as a separate jdbc input (including jdbc driver, connection
95+ # string and other required parameters).
9596#
9697# Please note that if any of the statements use the `sql_last_value` parameter (e.g. for
9798# ingesting only data changed since last run), each input should define its own
@@ -147,6 +148,9 @@ class LogStash::Inputs::Jdbc < LogStash::Inputs::Base
147148 # Path of file containing statement to execute
148149 config :statement_filepath , :validate => :path
149150
151+ # Directory containing statement files to execute
152+ config :statements_directory , :validate => :path
153+
150154 # Hash of query parameter, for example `{ "target_id" => "321" }`
151155 config :parameters , :validate => :hash , :default => { }
152156
@@ -205,11 +209,15 @@ def register
205209 require "rufus/scheduler"
206210 prepare_jdbc_connection
207211
208- # Raise an error if @use_column_value is true, but no @tracking_column is set
209212 if @use_column_value
213+ # Raise an error if @use_column_value is true, but no @tracking_column is set
210214 if @tracking_column . nil?
211215 raise ( LogStash ::ConfigurationError , "Must set :tracking_column if :use_column_value is true." )
212216 end
217+ # Raise an error if @use_column_value is true, and @statements_directory is set
218+ if @statements_directory
219+ raise ( LogStash ::ConfigurationError , ":statements_directory must not be set if :use_column_value is true." )
220+ end
213221 end
214222
215223 @enable_encoding = !@charset . nil? || !@columns_charset . empty?
@@ -221,12 +229,24 @@ def register
221229 @sql_last_value = YAML . load ( File . read ( @last_run_metadata_path ) )
222230 end
223231
224- unless @statement . nil? ^ @statement_filepath . nil?
232+ if @statement && @statement_filepath
225233 raise ( LogStash ::ConfigurationError , "Must set either :statement or :statement_filepath. Only one may be set at a time." )
226234 end
227235
228236 @statement = File . read ( @statement_filepath ) if @statement_filepath
229237
238+ unless @statement . nil? ^ @statements_directory . nil?
239+ raise ( LogStash ::ConfigurationError , "Must set either :statement, :statement_filepath or :statements_directory. Only one may be set at a time" )
240+ end
241+
242+ @statements = [ ]
243+ if @statements_directory
244+ Dir . foreach ( @statements_directory ) do |file |
245+ next if File . directory? file
246+ @statements . push ( File . read ( @statements_directory + '/' + file ) )
247+ end
248+ end
249+
230250 if ( @jdbc_password_filepath and @jdbc_password )
231251 raise ( LogStash ::ConfigurationError , "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time." )
232252 end
@@ -246,13 +266,25 @@ def run(queue)
246266 if @schedule
247267 @scheduler = Rufus ::Scheduler . new ( :max_work_threads => 1 )
248268 @scheduler . cron @schedule do
249- execute_query ( queue )
269+ if @statements . any?
270+ for statement in @statements
271+ execute_query ( queue , statement )
272+ end
273+ else
274+ execute_query ( queue , @statement )
275+ end
250276 update_state_file
251277 end
252278
253279 @scheduler . join
254280 else
255- execute_query ( queue )
281+ if @statements . any?
282+ for statement in @statements
283+ execute_query ( queue , statement )
284+ end
285+ else
286+ execute_query ( queue , @statement )
287+ end
256288 update_state_file
257289 end
258290 end # def run
@@ -265,10 +297,10 @@ def stop
265297
266298 private
267299
268- def execute_query ( queue )
300+ def execute_query ( queue , statement )
269301 # update default parameters
270302 @parameters [ 'sql_last_value' ] = @sql_last_value
271- execute_statement ( @ statement, @parameters ) do |row |
303+ execute_statement ( statement , @parameters ) do |row |
272304 if enable_encoding?
273305 ## do the necessary conversions to string elements
274306 row = Hash [ row . map { |k , v | [ k . to_s , convert ( k , v ) ] } ]
0 commit comments