Product and Technology

Operationalizing Data Pipelines With Snowpark Stored Procedures, Now in Preview

Operationalizing Data Pipelines With Snowpark Stored Procedures, Now in Preview

Following the recent GA of Snowpark for our customers on AWS, we’re happy to announce that Snowpark Scala stored procedures are now available in preview to all customers on all clouds.

Snowpark provides a language-integrated way to build and run data pipelines using powerful abstractions like DataFrames. With Snowpark, you write a client-side program to describe the pipeline you want to run, and all of the heavy lifting is pushed right into Snowflake’s elastic compute engine.

But once you’ve built and tested a pipeline, the next step is to operationalize it. To do that you need to find a home to host and schedule that client program, and until now that home has had to be somewhere outside of Snowflake.

With Snowpark Stored Procedures, this changes completely. Now you can host your pipelines right inside Snowflake, using a Snowflake virtual warehouse as the compute framework, and integrating naturally with Snowflake features like Tasks for scheduling. This simplifies the end-to-end story by reducing the number of systems involved and by keeping everything self-contained in Snowflake.

And the best part is that while Snowpark Stored Procedures are incredibly powerful, they’re also very simple to use.

Let’s look at an example. You can build some extraordinarily complex processes in Snowpark, but to keep our focus on stored procedures, we’ll make our example pretty straightforward:

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
 
object Main {
 def main(args: Array[String]): Unit = {
   // Connect
   val sess = Session.builder.configFile("config_demo.prop").create
 
   // run the pipeline
   println(processSales(sess))
 }
 
 def processSales(sess: Session): String = {
   import sess.implicits._
 
   // UDF to bucket sales by amount
   val qualifySale = udf((price:Float) =>
     if (price < 0.00) "ERROR"
     else if (price < 100.00) "Small"
     else if (price < 10000.00) "Medium"
     else "Large"
   )
 
   // Create and run the pipeline
   sess.table("raw_sales")
       .withColumn("size", qualifySale('sale_amount))
       .join(sess.table("catalog"), 'item_id === 'item)
       .drop('item)
       .write.saveAsTable("processed_sales")
 
   return "Success!"
 }
}

Note that we’ve broken our pipeline into two parts. There’s a very simple main() routine that simply creates a session from a configuration stored in a file and then calls into the processSales() function that actually implements the pipeline. We’ll come back to this division in a moment.

The processSales() function is also pretty straightforward: it creates a simple UDF to qualify sales by amount, and then uses this and a join with a catalog to transform some raw sales into processed tables. For example, it will transform this:

Into this:

But now we want to operationalize this pipeline and run it every night. To do that, we’re first going to create a Snowpark stored procedure. We can create these from JARs if we have complex code or want to bring along libraries, but for a simple example like this, we can simply copy-and-paste the code above into simple inline SQL syntax:

create or replace procedure processSales() returns string
language scala
runtime_version=2.12
packages=('com.snowflake:snowpark:latest')
handler = 'Main.processSales'
target_path = '@jars/processSales.jar'
as
$$
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._

object Main {
  def main(args: Array[String]): Unit = {
    // Connect
    val sess = Session.builder.configFile("config_demo.prop").create

    // run the pipeline
    println(processSales(sess))
  }

  def processSales(sess: Session): String = {
    import sess.implicits._

    // UDF to bucket sales by amount
    val qualifySale = udf((price:Float) => 
      if (price < 0.00) "ERROR"
      else if (price < 100.00) "Small"
      else if (price < 10000.00) "Medium"
      else "Large"
    )

    // Create and run the pipeline
    sess.table("raw_sales")
        .withColumn("size", qualifySale('sale_amount))
        .join(sess.table("catalog"), 'item_id === 'item)
        .drop('item)
        .write.saveAsTable("processed_sales")

    return "Success!"
  }
}
$$;

Note two things in this example: First, we’ve specified processSales() as the handler for this procedure. The main() routine is not actually used, and could be removed completely from the stored procedure. Second, the Scala processSales() takes a parameter — a Session — whereas the procedure exposed to SQL takes no parameters.

This is a general pattern. Snowflake will always inject a Session object as the first parameter of the call it makes to your handler. This Session routes back to the session on which the stored procedure was called. This provides a simple link back to Snowflake, and it also makes it very easy to build code like our example that can be run and tested outside Snowflake, and then brought into Snowflake for use in a stored procedure.

And with our stored procedure in place, we can call it:

call processed_sales();

And we can schedule it with tasks:

create or replace task process_sales_nightly
warehouse = 'small'
schedule = '1440 minute'
as
call processed_sales();

And that’s Snowpark Stored Procedures in a nutshell: a simple feature that lets you simplify your pipelines by hosting and scheduling them right inside Snowflake. Test it out using the Snowpark Quickstart. For more information, please see the documentation.

Happy hacking!

Share Article

Subscribe to our blog newsletter

Get the best, coolest and latest delivered to your inbox each week

Start your 30-DayFree Trial

Try Snowflake free for 30 days and experience the AI Data Cloud that helps eliminate the complexity, cost and constraints inherent with other solutions.