|
1 | | -from pyspark import SparkContext |
2 | | -from pyspark.sql import SQLContext, Row |
| 1 | +from pyspark.sql import SparkSession |
3 | 2 |
|
4 | | -sc = SparkContext('local', 'barcos') |
5 | | -sq = SQLContext(sc) |
| 3 | +spark = SparkSession.builder.master("local").appName("container").getOrCreate() |
6 | 4 |
|
7 | | -df = sq.read.load('data/containers_tiny.parquet') |
| 5 | +df = spark.read.load('data/containers_tiny.parquet') |
8 | 6 | df.printSchema() |
9 | 7 |
|
10 | 8 | # Using API |
11 | 9 | df.select("ship_imo", "ship_name", "country").filter(df['country'] == 'DK').show() |
12 | 10 |
|
13 | 11 | # Register table alias to allow SQL use |
14 | 12 | df.createOrReplaceTempView("container") |
15 | | -sq.sql("SELECT ship_imo, ship_name FROM container WHERE country = 'DK'").show() |
| 13 | +spark.sql("SELECT ship_imo, ship_name FROM container WHERE country = 'DK'").show() |
16 | 14 |
|
17 | 15 | # ship_imo, num of containers, total ship weight |
18 | | -total_weight_rdd = sq.sql("SELECT ship_imo, count(container_id) number, sum(net_weight) total_weight FROM container GROUP BY ship_imo") |
| 16 | +total_weight_rdd = spark.sql("SELECT ship_imo, count(container_id) number, sum(net_weight) total_weight FROM container GROUP BY ship_imo") |
19 | 17 | total_weight_rdd.printSchema() |
20 | 18 | total_weight_rdd.show() |
21 | 19 | # print total_weight_rdd.map(lambda r: r['number']).collect() |
22 | 20 |
|
23 | 21 | # UDFs |
24 | | -sq.registerFunction('en_toneladas', lambda c: float(c) / 1000.0) |
25 | | -sq.sql("SELECT en_toneladas(net_weight) toneladas, net_weight FROM container WHERE container_id = 'FMBV1684747'").show() |
| 22 | +spark.udf.register('en_toneladas', lambda c: float(c) / 1000.0) |
| 23 | +spark.sql("SELECT en_toneladas(net_weight) toneladas, net_weight FROM container WHERE container_id = 'FMBV1684747'").show() |
26 | 24 |
|
27 | 25 | # JOINs: Extract description of container codes |
28 | | -codes = sq.read.json('data/iso-container-codes.json') |
| 26 | +codes = spark.read.json('data/iso-container-codes.json') |
29 | 27 | codes.createOrReplaceTempView('codes') |
30 | 28 | codes.printSchema() |
31 | 29 | codes.show() |
32 | 30 |
|
33 | | -w_desc = sq.sql("SELECT c.container_id, s.code, s.description FROM container c JOIN codes s on c.container_type = s.code") |
| 31 | +w_desc = spark.sql("SELECT c.container_id, s.code, s.description FROM container c JOIN codes s on c.container_type = s.code") |
34 | 32 | w_desc.show() |
35 | 33 | print(w_desc.groupBy("code").count().take(3)) |
36 | 34 |
|
0 commit comments