Search This Blog

Tuesday 25 June 2013

Greenplum GPLOAD with updates/merge ability

gpload” is a data loading utility that acts as an interface to Greenplum Database’s external table parallel loading feature. The Greenplum EXTERNAL TABLE feature allows us to define network data sources as tables that we can query to speed up the data loading process. Using a load specification defined in a YAML formatted control file, “gpload” executes a load by invoking the Greenplum parallel file server (gpdist) – Greenplum’s parallel file distribution program, creating an external table definition based on the source data defined, and executing an INSERT, UPDATE or MERGE operation to load the source data into the target table in the database.

In this example we show how we can load 500,000 records and then update those 500,000 records with GPLOAD.

1. Create a table to load the data into as shown below.

drop table if exists apples.people cascade;

create table apples.people (id int, name text)
DISTRIBUTED BY (id);

2. The data we are loading is defined as follows

[Tue Jun 25 22:39:24 gpadmin@:~/demos/gpload/inserts ] $ head person.txt
1,person1
2,person2
3,person3
4,person4
5,person5
6,person6
7,person7
8,person8
9,person9
10,person10

3. Create a YAML formatted control file to load the data into the table PEOPLE as shown below.

test.yml
  
VERSION: 1.0.0.1
DATABASE: gpadmin
USER: pas
HOST: 127.0.0.1
PORT: 5432
GPLOAD:
    INPUT:
     - SOURCE:
          LOCAL_HOSTNAME:
            - 127.0.0.1 
          PORT: 8100
          FILE: [ /Users/gpadmin/demos/gpload/inserts/person.txt ]
     - COLUMNS:
         - "id":
         - "name": 
     - FORMAT: text
     - DELIMITER: ','
     - ENCODING: 'UTF8'
     - NULL_AS: ''
     - ERROR_LIMIT: 100000
     - ERROR_TABLE: apples.err_people
    OUTPUT:
     - TABLE: apples.people 
     - MODE: INSERT  

4. Run a script to load the data as shown below.

Script:

gpload -f test.yml

Output:

[Tue Jun 25 22:44:29 gpadmin@:~/demos/gpload/inserts ] $ ./do-gpload.sh 
2013-06-25 22:44:35|INFO|gpload session started 2013-06-25 22:44:35
2013-06-25 22:44:35|INFO|started gpfdist -p 8100 -P 8101 -f "/Users/gpadmin/demos/gpload/inserts/person.txt" -t 30
2013-06-25 22:44:35|INFO|running time: 0.73 seconds
2013-06-25 22:44:35|INFO|rows Inserted          = 500000
2013-06-25 22:44:35|INFO|rows Updated           = 0
2013-06-25 22:44:35|INFO|data formatting errors = 0
2013-06-25 22:44:35|INFO|gpload succeeded

Now at this point lets update the same data set with new updated data , once again using gpload BUT this time the YAML control file is a little different here.

5. The data we are UPDATING is defined as follows

[Tue Jun 25 22:54:39 gpadmin@:~/demos/gpload/upserts ] $ head people.txt 
1,person1-updated
2,person2-updated
3,person3-updated
4,person4-updated
5,person5-updated
6,person6-updated
7,person7-updated
8,person8-updated
9,person9-updated
10,person10-updated

6. Create a YAML formatted control file to merge/update the data into the table PEOPLE as shown below.

test.yml
  
VERSION: 1.0.0.1
DATABASE: gpadmin
USER: pas
HOST: 127.0.0.1
PORT: 5432
GPLOAD:
    INPUT:
     - SOURCE:
          LOCAL_HOSTNAME:
            - 127.0.0.1 
          PORT: 8100
          FILE: [ /Users/gpadmin/demos/gpload/upserts/people.txt ]
     - COLUMNS:
         - id: int 
         - name: text 
     - FORMAT: text
     - DELIMITER: ','
     - ENCODING: 'UTF8'
     - NULL_AS: ''
     - ERROR_LIMIT: 100000
     - ERROR_TABLE: apples.err_people
    OUTPUT:
     - TABLE: apples.people 
     - MODE: MERGE
     - MATCH_COLUMNS:
           - id
     - UPDATE_COLUMNS:
           - name 
     - MAPPING:
         id: id 
         name: name  

7.  Run a script to update the POEPLE table data as shown below.

Script:

gpload -f test.yml

Output:

[Tue Jun 25 22:54:21 gpadmin@:~/demos/gpload/upserts ] $ ./do-gpload.sh    
2013-06-25 22:54:34|INFO|gpload session started 2013-06-25 22:54:34
2013-06-25 22:54:34|INFO|started gpfdist -p 8100 -P 8101 -f "/Users/gpadmin/demos/gpload/upserts/people.txt" -t 30
2013-06-25 22:54:39|INFO|running time: 5.13 seconds
2013-06-25 22:54:39|INFO|rows Inserted          = 0
2013-06-25 22:54:39|INFO|rows Updated           = 500000
2013-06-25 22:54:39|INFO|data formatting errors = 0
2013-06-25 22:54:39|INFO|gpload succeeded

8. Finally verified we indeed have updated the data in the PEOPLE table.
  
gpadmin=# select * from apples.people limit 10;
 id |       name       
----+------------------
  1 | person1-updated
  2 | person2-updated
  3 | person3-updated
  4 | person4-updated
  5 | person5-updated
  6 | person6-updated
  7 | person7-updated
  8 | person8-updated
  9 | person9-updated
 10 | person10-updated
(10 rows)

Thursday 13 June 2013

Creating a multi threaded insert client for SQLFire 1.1

In this example below we show how to create a multi threaded insert client to insert 1,000,000 records into SQLFire table. In this example below the table is partitioned with synchronous persistence turned on.The distributed system includes one locator and 5 data members.

1. Create Table as shown below
  
drop diskstore store1;

CREATE DISKSTORE STORE1;

drop table person;

create table person 
(id int primary key,
 name varchar(40))
PARTITION BY COLUMN (id)
REDUNDANCY 1
PERSISTENT 'STORE1' SYNCHRONOUS;
2. Multi Threaded Insert client Code.
  
package com.pivotal.fe.test;

import java.io.IOException;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class MultiThreadInsert 
{
 private String url;
 private String driverClassName;
 private int RECORDS;
 private int COMMIT_POINT;
 private int nThreads;
 
 public MultiThreadInsert() throws IOException 
 {
  loadProperties();
 }
 
 public void loadProperties() throws IOException
 {
     Properties props = new Properties();
     URL propertiesUrl = ClassLoader.getSystemResource("sqlfiretest.properties");
     props.load(propertiesUrl.openStream());
     
     url = (String) props.getProperty("url");
     driverClassName = (String) props.getProperty("driverclassname");
     RECORDS = Integer.parseInt((String) props.getProperty("records"));
     COMMIT_POINT = Integer.parseInt((String) props.getProperty("commit_point"));
     nThreads = Integer.parseInt((String) props.getProperty("nThreads"));
     
 }
 
 @SuppressWarnings("unchecked")
 public void start() throws InterruptedException, SQLException 
 {
  System.out.printf("Starting %d threads for %d records connecting to %s\n", nThreads, RECORDS, url);
  
        final ExecutorService executorService = Executors.newFixedThreadPool(nThreads);

        ArrayList list = new ArrayList();
        for (int i = 0; i < nThreads; i++) {
            list.add(new RunData(url, driverClassName, i+1));
        }
        long start = System.currentTimeMillis();
        
        List<Future<?>> tasks = executorService.invokeAll(list, 5, TimeUnit.MINUTES);
        
        for(Future<?> f : tasks){
         try {
    f.get();
   } catch (ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
        }
        
     long end = System.currentTimeMillis() - start;
     
     float elapsedTimeSec = end/1000F;

        System.out.println(String.format("Elapsed time in seconds %f", elapsedTimeSec));
        
     executorService.shutdown();
        System.exit(0);
 }
 
 private class RunData implements Callable 
 {
     int counter = 0;
        int increment;
        Connection conn;
        String url;
        String driverClassName;
        
        private RunData(String url, String driverClassName, int increment) 
        {
            this.increment = increment;
            this.url = url;
            this.driverClassName = driverClassName;
        }

     public Connection getConnection() throws SQLException, ClassNotFoundException
     {
      Class.forName(driverClassName);
      Connection conn = null;
      conn = DriverManager.getConnection(url);
      //System.out.println("auto commit = " + conn.getAutoCommit());
      return conn; 
     }
     
        public void run() 
        {
      PreparedStatement stmt = null;
      String sql = "insert into person values (?, ?)";
      int counter = 0, size = 0;
      long startTime, endTime;
      
            int dataSize = RECORDS / nThreads;
            try 
            {
    conn = getConnection();
   } 
            catch (Exception e1) 
   {
    // TODO Auto-generated catch block
    e1.printStackTrace();
   }
            
            System.out.printf("Start: %d  End: %d \n",(dataSize * (increment - 1)), (dataSize * increment));
      try 
      {
       stmt = conn.prepareStatement(sql);
       
       startTime = System.currentTimeMillis();
       for (int i = (dataSize * (increment - 1)); i < (dataSize * increment); i++)
       {
        counter = counter + 1;
        size = size + 1;
        stmt.setInt(1, i);
        stmt.setString(2, "Person" + i);
        stmt.addBatch();
        
        if (counter % COMMIT_POINT == 0)
        {
         stmt.executeBatch();
         endTime = System.currentTimeMillis();
         System.out.printf("Insert batch of %d records took | %d | milliseconds\n", size, (endTime - startTime));
         startTime = System.currentTimeMillis();
         size = 0;
        }
       }
       
       /* there might be more records so call stmt.executeBatch() prior to commit here */
       stmt.executeBatch();
       
       //System.out.printf("Number of records submitted %d.\n", counter);
       
                 
      } 
   catch (SQLException e) 
   {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
   finally
   {
    if (stmt != null)
    {
     try 
     {
      stmt.close();
     } 
     catch (SQLException e) 
     {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
   }      
        }
        
        public Object call() throws Exception 
        {
            run();
            return counter;
        }
  
 }

 /**
  * @param args
  * @throws InterruptedException 
  * @throws SQLException 
  * @throws IOException 
  */
 public static void main(String[] args) throws InterruptedException, SQLException, IOException 
 {
  // TODO Auto-generated method stub
  MultiThreadInsert test = new MultiThreadInsert();
  test.start();
 }
}
3. Output when run as follows.

Note: This was run on my MAC laptop which had 5 cache servers running on it. This would perform much better if I had 5 physical machines for each of the SQLFire cache server members.

[Fri Jul 05 22:33:47 papicella@:~/vmware/ant-demos/sqlfire/performance-test ] $ ant
Buildfile: /Users/papicella/vmware/ant-demos/sqlfire/performance-test/build.xml

init:

compile:

package:

run-insert-client:
     [echo] Starting insert client with jvm args : -server -showversion -Xms512m -Xmx512m
     [java] java version "1.6.0_37"
     [java] Java(TM) SE Runtime Environment (build 1.6.0_37-b06-434-11M3909)
     [java] Java HotSpot(TM) 64-Bit Server VM (build 20.12-b01-434, mixed mode)
     [java]
     [java] Starting 8 threads for 1000000 records connecting to jdbc:sqlfire://127.0.0.1:1527/;single-hop-enabled=true
     [java] Start: 875000  End: 1000000
     [java] Start: 125000  End: 250000
     [java] Start: 625000  End: 750000
     [java] Start: 500000  End: 625000
     [java] Start: 750000  End: 875000
     [java] Start: 375000  End: 500000
     [java] Start: 250000  End: 375000
     [java] Start: 0  End: 125000
     [java] Insert batch of 10000 records took | 1541 | milliseconds
     [java] Insert batch of 10000 records took | 1544 | milliseconds
     [java] Insert batch of 10000 records took | 1548 | milliseconds
     [java] Insert batch of 10000 records took | 1551 | milliseconds
     [java] Insert batch of 10000 records took | 1552 | milliseconds
     [java] Insert batch of 10000 records took | 1554 | milliseconds
     [java] Insert batch of 10000 records took | 1560 | milliseconds
     [java] Insert batch of 10000 records took | 1568 | milliseconds
     [java] Insert batch of 10000 records took | 552 | milliseconds
     [java] Insert batch of 10000 records took | 559 | milliseconds
     [java] Insert batch of 10000 records took | 563 | milliseconds
     [java] Insert batch of 10000 records took | 576 | milliseconds
     [java] Insert batch of 10000 records took | 581 | milliseconds
     [java] Insert batch of 10000 records took | 618 | milliseconds
     [java] Insert batch of 10000 records took | 600 | milliseconds
     [java] Insert batch of 10000 records took | 600 | milliseconds
     [java] Insert batch of 10000 records took | 630 | milliseconds
     [java] Insert batch of 10000 records took | 649 | milliseconds
     [java] Insert batch of 10000 records took | 643 | milliseconds
     [java] Insert batch of 10000 records took | 656 | milliseconds
     [java] Insert batch of 10000 records took | 687 | milliseconds
     [java] Insert batch of 10000 records took | 705 | milliseconds
     [java] Insert batch of 10000 records took | 705 | milliseconds
     [java] Insert batch of 10000 records took | 665 | milliseconds
     [java] Insert batch of 10000 records took | 682 | milliseconds
     [java] Insert batch of 10000 records took | 612 | milliseconds
     [java] Insert batch of 10000 records took | 712 | milliseconds
     [java] Insert batch of 10000 records took | 630 | milliseconds
     [java] Insert batch of 10000 records took | 646 | milliseconds
     [java] Insert batch of 10000 records took | 656 | milliseconds
     [java] Insert batch of 10000 records took | 661 | milliseconds
     [java] Insert batch of 10000 records took | 686 | milliseconds
     [java] Insert batch of 10000 records took | 475 | milliseconds
     [java] Insert batch of 10000 records took | 497 | milliseconds
     [java] Insert batch of 10000 records took | 499 | milliseconds
     [java] Insert batch of 10000 records took | 501 | milliseconds
     [java] Insert batch of 10000 records took | 534 | milliseconds
     [java] Insert batch of 10000 records took | 530 | milliseconds
     [java] Insert batch of 10000 records took | 534 | milliseconds
     [java] Insert batch of 10000 records took | 505 | milliseconds
     [java] Insert batch of 10000 records took | 534 | milliseconds
     [java] Insert batch of 10000 records took | 513 | milliseconds
     [java] Insert batch of 10000 records took | 535 | milliseconds
     [java] Insert batch of 10000 records took | 550 | milliseconds
     [java] Insert batch of 10000 records took | 504 | milliseconds
     [java] Insert batch of 10000 records took | 517 | milliseconds
     [java] Insert batch of 10000 records took | 528 | milliseconds
     [java] Insert batch of 10000 records took | 534 | milliseconds
     [java] Insert batch of 10000 records took | 523 | milliseconds
     [java] Insert batch of 10000 records took | 531 | milliseconds
     [java] Insert batch of 10000 records took | 517 | milliseconds
     [java] Insert batch of 10000 records took | 571 | milliseconds
     [java] Insert batch of 10000 records took | 598 | milliseconds
     [java] Insert batch of 10000 records took | 641 | milliseconds
     [java] Insert batch of 10000 records took | 652 | milliseconds
     [java] Insert batch of 10000 records took | 650 | milliseconds
     [java] Insert batch of 10000 records took | 498 | milliseconds
     [java] Insert batch of 10000 records took | 518 | milliseconds
     [java] Insert batch of 10000 records took | 542 | milliseconds
     [java] Insert batch of 10000 records took | 577 | milliseconds
     [java] Insert batch of 10000 records took | 557 | milliseconds
     [java] Insert batch of 10000 records took | 529 | milliseconds
     [java] Insert batch of 10000 records took | 550 | milliseconds
     [java] Insert batch of 10000 records took | 552 | milliseconds
     [java] Insert batch of 10000 records took | 525 | milliseconds
     [java] Insert batch of 10000 records took | 541 | milliseconds
     [java] Insert batch of 10000 records took | 544 | milliseconds
     [java] Insert batch of 10000 records took | 522 | milliseconds
     [java] Insert batch of 10000 records took | 537 | milliseconds
     [java] Insert batch of 10000 records took | 511 | milliseconds
     [java] Insert batch of 10000 records took | 554 | milliseconds
     [java] Insert batch of 10000 records took | 556 | milliseconds
     [java] Insert batch of 10000 records took | 513 | milliseconds
     [java] Insert batch of 10000 records took | 503 | milliseconds
     [java] Insert batch of 10000 records took | 468 | milliseconds
     [java] Insert batch of 10000 records took | 539 | milliseconds
     [java] Insert batch of 10000 records took | 581 | milliseconds
     [java] Insert batch of 10000 records took | 569 | milliseconds
     [java] Insert batch of 10000 records took | 541 | milliseconds
     [java] Insert batch of 10000 records took | 544 | milliseconds
     [java] Insert batch of 10000 records took | 626 | milliseconds
     [java] Insert batch of 10000 records took | 613 | milliseconds
     [java] Insert batch of 10000 records took | 662 | milliseconds
     [java] Insert batch of 10000 records took | 594 | milliseconds
     [java] Insert batch of 10000 records took | 644 | milliseconds
     [java] Insert batch of 10000 records took | 637 | milliseconds
     [java] Insert batch of 10000 records took | 675 | milliseconds
     [java] Insert batch of 10000 records took | 658 | milliseconds
     [java] Insert batch of 10000 records took | 608 | milliseconds
     [java] Insert batch of 10000 records took | 624 | milliseconds
     [java] Insert batch of 10000 records took | 538 | milliseconds
     [java] Insert batch of 10000 records took | 511 | milliseconds
     [java] Insert batch of 10000 records took | 484 | milliseconds
     [java] Insert batch of 10000 records took | 440 | milliseconds
     [java] Insert batch of 10000 records took | 462 | milliseconds
     [java] Insert batch of 10000 records took | 476 | milliseconds
     [java] Elapsed time in seconds 8.502000

BUILD SUCCESSFUL
Total time: 9 seconds