Monday 18 February 2013

NOTE: DS2, Threaded Processing

In my recent posts on DS2 (DATA step evolved), I showed the basic syntax plus packages & methods, and I showed the use of SQL within a SET statement. In today's post, I'll show the biggest raison d'être for DS2 - the ability to run your code in threads to make it finish its job more quickly.

"Big data", that's the big talking point. One of the key principles of performing speedy analytics on big data is to split the data across multiple processors and disks, to send the code to the distributed processors and disks, have the code run on each processor against its sub-set of data, and to collate the results back at the point from which the request was originally made. Thus, we're sending code to the data rather than pulling the data to the code. It's quicker to send a few dozen lines of code to many processors than it is to pull many millions of rows of data to one (big) processor.

DS2 was designed for data manipulation and data modeling applications. DS2 also enhances a SAS programmer’s repertoire with object-based tools by providing data abstraction using packages and methods. DS2 executes both within a SAS session by using PROC DS2, and within selected databases where the SAS Embedded Process is installed.

Here's a simple example. In summary, it shows how the use of eight threads reduces the turnaround time of the task from 24 seconds in a conventional DATA step to 4.4 seconds in a call to DS2 with eight threads.

16 /*****************************/
17 /* Create a chunky data set. */
18 /* Then read it:             */
19 /* a. With one thread        */
20 /* b. With eight threads     */
21 /* c. Using "old" DATA step  */
22 /*****************************/
24 options msglevel=n;
25 options cpucount=actual;
27 proc options option=threads;run;
SAS (r) Proprietary Software Release 9.2 TS2M3
THREADS Threads are available for use with features of the SAS System that support threading
28 proc options option=cpucount;run;
SAS (r) Proprietary Software Release 9.2 TS2M3
CPUCOUNT=24 Number of processors available.
30 /****************************/
31 /* Create a chumky data set */
32 /****************************/
33 data work.jmaster;
34   do j = 1 to 10e6;
35     output;
36   end;
37 run;
39 /**************************/
40 /* Now read it three ways */
41 /**************************/
43 /* But first define the threaded code thread */
44 proc ds2;
45   thread r /overwrite=yes;
46     dcl double count;
47     method run();
48       set work.jmaster;
49       count+1;
50       do k=1 to 100;/* Add some gratuitous computation! */
51         x=k/count + k/count + k/count;
52       end;
53     end;
54     method term();
55       OUTPUT;
56     end;
57   endthread;
58 run;
NOTE: Execution succeeded. No rows affected.59 quit;  60
61 /* One thread */
62 proc ds2;
63   data j1(overwrite=yes);
64     dcl thread r r_instance;
65     dcl double count;
66     method run();
67       set from r_instance threads=1;
68       total+count;
69     end;
70   enddata;
71 run;
NOTE: Execution succeeded. One row affected.72 quit;NOTE: PROCEDURE DS2 used (Total process time):
real time 25.09 seconds
cpu time 25.16 seconds
74 /* Eight threads */
75 proc ds2;
76   data j8(overwrite=yes);
77     dcl thread r r_instance;
78     dcl double count;
79     method run();
80       set from r_instance threads=8;
81       total+count;
82     end;
83   enddata;
84 run;
NOTE: Execution succeeded. 8 rows affected.85 quit;NOTE: PROCEDURE DS2 used (Total process time):
real time 4.40 seconds
cpu time 32.96 seconds
87 /* And read it in DATA step */
88 data jold;
89   set work.jmaster end=finish;
90   count+1;
91   do k=1 to 100;/* Add some gratuitous computation! */
92     x=k/count + k/count + k/count;
93   end;
94   if finish then output;
95 run;
NOTE: There were 10000000 observations read from the data set WORK.JMASTER.NOTE: The data set WORK.JOLD has 1 observations and 4 variables.
NOTE: DATA statement used (Total process time):
real time 23.98 seconds
cpu time 23.75 seconds

The code does six things:
  • Firstly, it sets the maximum number of CPUs available to the SAS task. It sets this to 24 (this box has 12 cores, each with two hyperthreads)
  • Next, it creates a sample data set (to be read in the subsequent steps)
  • It then uses DS2 to define a thread - like a function or a method. The thread will read a row from the sample data set, increment a count and do some arbitrary compute activity (to ensure the test exercise isn't I/O-bound, thereby preventing DS2 from showing its capabilities)
  • Next, it executes DS2 again. The previously defined thread is used inside an executable piece of code (once, because threads=1). This takes 25 seconds elapsed time (and 25 seconds CPU time)
  • Now we execute the same code with DS2 but we use eight threads (each processing one eighth of the input records). This takes 4.4 seconds to complete (but consuming a total of 33 seconds of CPU time across the eight threads)
  • Finally, we execute a traditional DATA step to do the same task. Not surprisingly, it takes a similar amount of time to complete the task as the single-threaded DS2 code
So, we see how the use of eight threads reduced the turnaround time by a factor of 25/4.4=5.7. Not a factor of 8, but not bad nonetheless!

This example suggests that great benefit can be achieved with DS2 threads. In our simple case, the code was compute-bound. If your task is more I/O-bound then the benefits may be less predictable. Jason, from the DS2 development team, recently told me:
When DS2 does I/O, it starts one "reader" thread. This reader thread requests blocks of data from a data source and puts the blocks on a queue. Then, either the DS2 "DATA" program fetches those blocks off the queue or DS2 "THREAD" programs fetch the blocks off the queue.

The key here is there is one reader thread and one or more "compute" threads. As long as the reader thread can keep up with the compute threads, we should see a speed up in execution time. One reader thread works well for most data sources as data sources usually present one source to read from.

With a data source like SPDE, there could be multiple data sources across multiple devices. Right now DS2 does not take advantage of having multiple reader threads. However, I believe the architecture is flexible enough to allow mulitple reader threads.

At this point, we have been discussing I/O on a single machine. SAS High-Performance Analytics and SAS Scoring Accelerator execute DS2 in parallel across many machines. The I/O model for these types of systems is different and enables DS2 programs to better take advantage of multiple storage devices.
So, as is usually the case when we talk of performance, a lot depends on your hardware architecture and the amount of effort you put into the tuning of your architecture and code. Nonethess, with DS2 it seems that there are benefits aplenty to be had.

In my next post, I'll wrap up the topic with a few extra details.


NOTE: DS2. Data Step Evolved?
NOTE: DS2, Learn Something New!
NOTE: DS2, SQL Within a SET Statement
NOTE: DS2, Threaded Processing
NOTE: DS2, Final Comments
NOTE: DS2, Final, Final Comments