PL/SQL Batch/Stream Processor
Example Description
- The package 'BATCH_STREAM_PROCESSOR' contains function 'JOB1' returning one typed ref cursor. The generated number of rows is configured by input parameter 'i_number_of_rows'.
- The Java program 'BatchStreamProcessor' processes each element of the stream. In this case just printed to System.out.
Important Notes !
- Use two different data sources/connections when processing the rows to the database again.
- One data source/connection for the batch/stream processor for reading.
- One data source/connection for writing the data.
Package Specification
The package 'BATCH_STREAM_PROCESSOR' defines a typed ref cursor.PL/SQL Package Specification : BATCH_STREAM_PROCESSOR
create or replace package batch_stream_processor
/**
* PL/SQL Batch/Stream processor demonstration.
*/
as
-- PL/SQL record row declaration for batch/stream processing
type rec_batch_row is record (
no number(9),
description varchar2(100),
ts timestamp
);
-- cursor definition
type c_batch_row is ref cursor return rec_batch_row;
/**
* Functions returns one cursor for batch processing.
*
* @param i_number_of_rows Number of rows to generate.
* @return Row of PL/SQL record
*/
function job1 (i_number_of_rows in number) return c_batch_row;
end batch_stream_processor;
Package Body
The package body 'BATCH_STREAM_PROCESSOR' implements a typed ref cursor.PL/SQL Package Body : BATCH_STREAM_PROCESSOR
create or replace package body batch_stream_processor
as
function job1 (i_number_of_rows in number) return c_batch_row
is
c c_batch_row;
begin
-- generate dummy list of rows
open c for
select level,
'any description no. ' || level,
systimestamp+level+level/12.
from dual
connect by level<=i_number_of_rows;
-- return cursor
return c;
end job1;
end batch_stream_processor;
Factory API : Calling the PL/SQL package
Using the static factory to get the remote service.Java Calling Class : BatchStreamProcessorFactoryApi.java
package batch_stream_processor_examples.factoryapi;
import java.util.stream.Stream;
import factory.ExamplesRPCFactory;
import service.BatchStreamProcessorService;
import transferobject.BatchStreamProcessorTO;
public class BatchStreamProcessorFactoryApi {
public static void main(String[] args) {
try {
// set database credentials and configuration parameters
System.setProperty("dbw_examples.url", "jdbc:oracle:thin:@192.168.0.109:1521/orcl");
System.setProperty("dbw_examples.username", "dbw_examples");
System.setProperty("dbw_examples.password", "dbw_examples");
// getting the service
BatchStreamProcessorService service = ExamplesRPCFactory.getBatchStreamProcessorService();
// use try-with-resources statement to free resources after call
try (Stream<BatchStreamProcessorTO.RecBatchRow> stream = service.job1Stream(1000, 0)) {
stream.forEach(e -> System.out.format("no:%d description:%s timestamp:%3$tD %3$tT%n",
e.no,
e.description,
e.ts));
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
Spring API : Calling the PL/SQL package
Using Spring annotation to inject the service and call the remote service.Java Calling Class : BatchStreamProcessorSpringApi.java
package batch_stream_processor_examples.springapi;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
import service.BatchStreamProcessorService;
import transferobject.BatchStreamProcessorTO;
@Configuration
@ComponentScan(basePackages = { "impl" })
@Component
public class BatchStreamProcessorSpringApi {
@Autowired
private BatchStreamProcessorService batchStreamProcessorService;
public static void main(String[] args) {
// set database credentials and configuration parameters
System.setProperty("dbw_examples.url", "jdbc:oracle:thin:@192.168.0.109:1521/orcl");
System.setProperty("dbw_examples.username", "dbw_examples");
System.setProperty("dbw_examples.password", "dbw_examples");
// Register Spring Beans, Spring Context and call demo method
try (
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(BatchStreamProcessorSpringApi.class))
{
BatchStreamProcessorSpringApi demo = ctx.getBean(BatchStreamProcessorSpringApi.class);
demo.runDemo();
}
catch (Exception e) {
e.printStackTrace();
}
}
private void runDemo() {
// use try-with-resources statement to free resources after call
try (Stream<BatchStreamProcessorTO.RecBatchRow> stream = batchStreamProcessorService.job1Stream(1000, 0)) {
stream.forEach(e -> System.out.format("no:%d description:%s timestamp:%3$tD %3$tT%n",
e.getNo(),
e.getDescription(),
e.getTs()));
}
catch (Exception e) {
e.printStackTrace();
}
}
@Bean
public DataSource getDataSource() throws Exception {
PoolDataSource poolDataSource = PoolDataSourceFactory.getPoolDataSource();
poolDataSource.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
poolDataSource.setURL(System.getProperty("dbw_examples.url"));
poolDataSource.setUser(System.getProperty("dbw_examples.username"));
poolDataSource.setPassword(System.getProperty("dbw_examples.password"));
poolDataSource.setInitialPoolSize(1);
poolDataSource.setMinPoolSize(2);
poolDataSource.setMaxPoolSize(10);
poolDataSource.setLoginTimeout(10);
poolDataSource.setInactiveConnectionTimeout(30);
poolDataSource.setTimeoutCheckInterval(15);
poolDataSource.setValidateConnectionOnBorrow(true);
poolDataSource.setConnectionWaitTimeout(60);
return poolDataSource;
}
}