PL/SQL Batch/Stream Processor

Example Description

  • The package 'BATCH_STREAM_PROCESSOR' contains function 'JOB1' returning one typed ref cursor. The generated number of rows is configured by input parameter 'i_number_of_rows'.
  • The Java program 'BatchStreamProcessor' processes each element of the stream. In this case just printed to System.out.

Important Notes !

  • Use two different data sources/connections when processing the rows to the database again.
    • One data source/connection for the batch/stream processor for reading.
    • One data source/connection for writing the data.

Package Specification

The package 'BATCH_STREAM_PROCESSOR' defines a typed ref cursor.
PL/SQL Package Specification : BATCH_STREAM_PROCESSOR
create or replace package batch_stream_processor
/**
 * PL/SQL Batch/Stream processor demonstration.
 */
as

-- PL/SQL record row declaration for batch/stream processing
type rec_batch_row is record (
  no            number(9),
  description   varchar2(100),
  ts            timestamp
);

-- cursor definition
type c_batch_row is ref cursor return rec_batch_row;

/**
 * Functions returns one cursor for batch processing.
 *
 * @param i_number_of_rows Number of rows to generate.
 * @return Row of PL/SQL record
 */
function job1 (i_number_of_rows in number) return c_batch_row;

end batch_stream_processor;

Package Body

The package body 'BATCH_STREAM_PROCESSOR' implements a typed ref cursor.
PL/SQL Package Body : BATCH_STREAM_PROCESSOR
create or replace package body batch_stream_processor
as

function job1 (i_number_of_rows in number) return c_batch_row
is
  c c_batch_row;
begin
  -- generate dummy list of rows
  open c for
    select  level, 
            'any description no. ' || level, 
            systimestamp+level+level/12. 
      from  dual 
      connect by level<=i_number_of_rows;

  -- return cursor
  return c;
end job1;

end batch_stream_processor;

Factory API : Calling the PL/SQL package

Using the static factory to get the remote service.
Java Calling Class : BatchStreamProcessorFactoryApi.java
package batch_stream_processor_examples.factoryapi;

import java.util.stream.Stream;

import factory.ExamplesRPCFactory;
import service.BatchStreamProcessorService;
import transferobject.BatchStreamProcessorTO;

public class BatchStreamProcessorFactoryApi {
  public static void main(String[] args) {
    try {
      // set database credentials and configuration parameters
      System.setProperty("dbw_examples.url", "jdbc:oracle:thin:@192.168.0.109:1521/orcl");
      System.setProperty("dbw_examples.username", "dbw_examples");
      System.setProperty("dbw_examples.password", "dbw_examples");

      // getting the service
      BatchStreamProcessorService service = ExamplesRPCFactory.getBatchStreamProcessorService();

      // use try-with-resources statement to free resources after call
      try (Stream<BatchStreamProcessorTO.RecBatchRow> stream = service.job1Stream(1000, 0)) {
        stream.forEach(e -> System.out.format("no:%d   description:%s   timestamp:%3$tD %3$tT%n",
                                              e.no,
                                              e.description,
                                              e.ts));
      }
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }
}

Spring API : Calling the PL/SQL package

Using Spring annotation to inject the service and call the remote service.
Java Calling Class : BatchStreamProcessorSpringApi.java
package batch_stream_processor_examples.springapi;

import java.util.stream.Stream;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
import service.BatchStreamProcessorService;
import transferobject.BatchStreamProcessorTO;

@Configuration
@ComponentScan(basePackages = { "impl" })
@Component
public class BatchStreamProcessorSpringApi {
  @Autowired
  private BatchStreamProcessorService batchStreamProcessorService;

  public static void main(String[] args) {
    // set database credentials and configuration parameters
    System.setProperty("dbw_examples.url", "jdbc:oracle:thin:@192.168.0.109:1521/orcl");
    System.setProperty("dbw_examples.username", "dbw_examples");
    System.setProperty("dbw_examples.password", "dbw_examples");
    
    // Register Spring Beans, Spring Context and call demo method 
    try (
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(BatchStreamProcessorSpringApi.class))
    {
      BatchStreamProcessorSpringApi demo = ctx.getBean(BatchStreamProcessorSpringApi.class);
      demo.runDemo();
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }

  private void runDemo() {
    // use try-with-resources statement to free resources after call
    try (Stream<BatchStreamProcessorTO.RecBatchRow> stream = batchStreamProcessorService.job1Stream(1000, 0)) {
      stream.forEach(e -> System.out.format("no:%d   description:%s   timestamp:%3$tD %3$tT%n",
                                            e.getNo(),
                                            e.getDescription(),
                                            e.getTs()));
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }

  @Bean
  public DataSource getDataSource() throws Exception {
    PoolDataSource poolDataSource = PoolDataSourceFactory.getPoolDataSource();
    poolDataSource.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
    poolDataSource.setURL(System.getProperty("dbw_examples.url"));
    poolDataSource.setUser(System.getProperty("dbw_examples.username"));
    poolDataSource.setPassword(System.getProperty("dbw_examples.password"));
    poolDataSource.setInitialPoolSize(1);
    poolDataSource.setMinPoolSize(2);
    poolDataSource.setMaxPoolSize(10);
    poolDataSource.setLoginTimeout(10);
    poolDataSource.setInactiveConnectionTimeout(30);
    poolDataSource.setTimeoutCheckInterval(15);
    poolDataSource.setValidateConnectionOnBorrow(true);
    poolDataSource.setConnectionWaitTimeout(60);
    return poolDataSource;
  }
}