DSLab2

Download as pdf or txt
Download as pdf or txt
You are on page 1of 6

VIETNAM NATIONAL UNIVERSITY, HO CHI MINH CITY

UNIVERSITY OF TECHNOLOGY
FACULTY OF COMPUTER SCIENCE AND ENGINEERING

Distributed System Lab

Lab 2: Introduction to Apache Spark


& RDD-based Programming

Student: Nguyen Hoang Anh Thu - 2053478

HO CHI MINH CITY, NOVEMBER 2023


University of Technology, Ho Chi Minh City
Faculty of Computer Science and Engineering

Contents
1 Task 1: Filtering Wrong Records 2

2 Task 2: Preprocessing 4

Computer Network - Academic year 2021 - 2022 Page 1/5


University of Technology, Ho Chi Minh City
Faculty of Computer Science and Engineering

1 Task 1: Filtering Wrong Records


Step 1: Create a directory in HDFS and copy the log file from local machine to HDFS. After
that, I will check whether the log file has been copied:
1 hadoop fs - ls / user / S2053478 / test1 /

Step 2: Create a RDD (Resilient Distributed Datasheet).

Step 3: Create a function to filter as the criteria and a function to convert time format:
1 >>> def f i l t e r _ c o r r e c t _ r e c o r d s ( line ) :
2 ... fields = line . split ( " " )
3 ... criteria = len ( fields ) == 7 and float ( fields [0]) >= 0 and fields [6]. isdigit ()
and int ( fields [6]) > 0 and fields [2] != " -"
4 ... return criteria
5 ...
6 >>> filtered_data = log_data . filter ( f i l t e r _ c o r r e c t _ r e c o r d s )
7 >>> def convert_time ( line ) :
8 ... fields = line . split ( " " )
9 ... input_data = fields [3] + " " + fields [4]
10 ... time_format = " [% d /% b /% Y :% H :% M :% S % z ] "
11 ... try :
12 ... timestamp = datetime . strptime ( input_data , time_format ) . replace ( tzinfo =
pytz . UTC ) . timestamp ()
13 ... return timestamp
14 ... except ValueError :
15 ... return None
16 ...
17 >>> filtered_data = filtered_data . filter ( lambda x : convert_time ( x ) is not None )
18 >>> sorted_data = filtered_data . sortBy ( convert_time )
19 >>> filtered_data . count ()
20 1303227
Listing 1: Total number of records

Step 4: The same as step 3 but for the incorrect records:


1 >>> def f i l t e r _ i n c o r r e c t _ r e c o r d s ( line ) :
2 ... return not f i l t e r _ c o r r e c t _ r e c o r d s ( line )
3 ...
4 >>> fil te re d_ fa il _d at a = log_data . filter ( f i l t e r _ i n c o r r e c t _ r e c o r d s )
5 >>> fil te re d_ fa il _d at a = fi lt er ed_ fa il _d at a . filter ( lambda x : convert_time ( x ) is not
None )
6 >>> sorted_fail_data = fi lt er ed_ fa il _d at a . sortBy ( convert_time )
7 >>> fil te re d_ fa il _d at a . count ()

Computer Network - Academic year 2021 - 2022 Page 2/5


University of Technology, Ho Chi Minh City
Faculty of Computer Science and Engineering

8 593586
Listing 2: Number of wrong records

Step 5: Print out the 10 of correct and incorrect records.


For the correct records:
1 >>> for record in sorted_data . take (10) :
2 ... print ( record )
3 ...
4 0.000 58.187.29.147 HIT [02/ Dec /2018:00:00:00 +0700] / live / prod_kplus_ns_hd /
prod_kplus_ns_hd . isml / events (1541466558) / dash / prod_kplus_ns_hd - audio_vie
=56000 -49397873671168. dash 28401
5 0.055 113.23.26.76 HIT [02/ Dec /2018:00:00:00 +0700] / live / prod_kplus_1_hd /
prod_kplus_1_hd . isml / events (1541466464) / dash / prod_kplus_1_hd - video
=2499968 -926210122800. dash 1265928
6 0.000 118.69.60.62 HIT [02/ Dec /2018:00:00:00 +0700] / live / prod_kplus_pm_hd /
prod_kplus_pm_hd . isml / stb . mpd 39902
7 0.000 42.118.29.197 HIT [02/ Dec /2018:00:00:00 +0700] / live / prod_kplus_ns_hd /
prod_kplus_ns_hd . isml / prod_kplus_ns_hd . mpd 40102
8 0.000 14.231.34.1 HIT [02/ Dec /2018:00:00:00 +0700] /
c c 8 c 9 6 c a 2 e 6 b 3 a a 3 4 6 5 a 5 e 2 d 3 8 3 c 8 e 0 1 1 5 4 3 6 8 7 4 4 5 / box / _definst_ / vtv3 - high . m3u8 323
9 0.001 42.113.129.241 HIT [02/ Dec /2018:00:00:00 +0700] /
d 0 5 3 d 5 1 f e 6 c 3 9 3 5 d 4 c 2 5 9 0 8 0 8 9 b 7 c 4 9 6 1 5 4 3 6 9 2 6 0 7 / ndvr / vtv3 / _definst_ /20181201/ vtv3 - high
-20181201175215 -611181. ts 742224
10 0.002 14.244.239.143 HIT [02/ Dec /2018:00:00:00 +0700] /8
d 7 8 6 3 d c 4 1 8 6 5 b 3 2 0 5 4 c d 8 4 7 8 4 3 0 f b 5 2 1 5 4 3 6 8 9 5 6 0 / box / _definst_ / vtv2 - high -2063449. ts
760272
11 0.000 14.229.126.144 HIT [02/ Dec /2018:00:00:00 +0700] /8
c 6 b e b d 4 e d d 7 8 7 5 0 2 9 1 5 9 3 f 1 7 5 6 e 8 6 1 a 1 5 4 3 6 8 6 1 5 3 / box / _definst_ / vtv6 - high . m3u8 323
12 0.000 113.22.7.224 HIT [02/ Dec /2018:00:00:00 +0700] / live / prod_kplus_ns_hd /
prod_kplus_ns_hd . isml / events (1541466558) / dash / prod_kplus_ns_hd - audio_vie
=56000 -49397873798144. dash 28840
13 0.000 113.179.83.143 HIT [02/ Dec /2018:00:00:00 +0700] /98
c a e f 0 f f 9 b 8 5 3 5 1 5 f e 1 e 3 8 5 8 3 9 7 b a d f 1 5 4 3 6 8 5 8 1 9 / box / _definst_ / vtv6 - high . m3u8 323
Listing 3: Top 10 correct records

For the incorrect records:


1 >>> for record in sorted_fail_data . take (10) :
2 ... print ( record )
3 ...
4 0.000 123.18.156.7 - [02/ Dec /2018:00:00:00 +0700] /
b 7 4 3 9 4 d 1 b d 1 4 f 1 a f b 0 c 4 b e 5 b 8 c 4 c 2 2 4 8 1 5 4 3 6 1 9 6 8 5 / box / _definst_ / vtv1 - high . m3u8 166
5 0.000 113.161.6.128 - [02/ Dec /2018:00:00:00 +0700] /4176
f 0 2 5 6 a 9 c 8 b 9 d 5 c f 6 8 5 4 e 2 7 a d e 7 4 6 1 5 4 3 6 8 1 5 8 7 / box / _definst_ / vtv3 - high . m3u8 166
6 0.000 42.115.220.10 HIT [02/ Dec /2018:00:00:00 +0700] / live / prod_kplus_1_hd /
prod_kplus_1_hd . isml / stb . mpd 0
7 0.000 27.3.65.112 - [02/ Dec /2018:00:00:00 +0700] /1
c 6 8 c c 3 b 6 2 0 f 1 3 7 9 7 a 8 d c 7 d 6 0 e a 8 b e c 4 1 5 4 3 5 7 1 8 0 5 / box / _definst_ / vtv3 - high . m3u8 166
8 0.000 113.162.4.228 - [02/ Dec /2018:00:00:00 +0700] /73
f 2 e 7 f 5 a 3 5 2 d c 4 8 b c 3 f 4 6 c c f e 1 5 5 a 7 0 1 5 4 3 6 1 7 7 7 6 / box / _definst_ / vtv3 - high . m3u8 166
9 0.000 14.181.24.168 - [02/ Dec /2018:00:00:00 +0700] /9
f 9 a c f b 0 5 0 a f 2 4 1 7 2 3 b 7 5 7 6 a c 7 9 8 7 9 d 8 1 5 4 3 6 3 4 6 1 9 / box / _definst_ / vtv3 - high . m3u8 166
10 0.000 14.228.49.147 - [02/ Dec /2018:00:00:00 +0700] /
a b 6 5 5 2 3 b 7 2 1 0 5 c 0 a d 0 2 7 9 3 6 f d d c 3 c 5 e c 1 5 4 3 5 6 2 1 0 6 / box / _definst_ / vtv3 - high . m3u8 166
11 0.000 123.17.19.243 - [02/ Dec /2018:00:00:00 +0700] /45
c b 0 9 1 7 f e 1 8 e c 6 2 f 2 c 5 a f 7 0 a 9 e 9 9 2 b 6 1 5 4 3 6 3 2 0 2 0 / box / _definst_ / vtv3 - high . m3u8 166

Computer Network - Academic year 2021 - 2022 Page 3/5


University of Technology, Ho Chi Minh City
Faculty of Computer Science and Engineering

12 0.000 14.189.182.247 - [02/ Dec /2018:00:00:00 +0700] /


e 9 8 7 9 6 6 4 2 6 f e 6 6 7 0 0 b 9 a 0 7 7 5 a 0 8 f 7 c 2 d 1 5 4 3 5 4 8 3 3 7 / box / _definst_ / vtv3 - high . m3u8 166
13 0.000 14.165.116.89 - [02/ Dec /2018:00:00:00 +0700] /6
a b 3 a b 2 f c c 3 c b 0 7 a 5 a 9 f 6 c d b 7 7 8 4 d a 9 a 1 5 4 3 6 7 5 6 2 4 / box / _definst_ / vtv1 - high . m3u8 166
Listing 4: Top 10 incorrect records

2 Task 2: Preprocessing
Step 1: Create a function to classify services:
The classif ied log data RDD:
• Apply the below function to each line in the f iltered data RDD that I did at the Task 1.
• Create a new RDD of Key-Value pairs (as the service counts variable) which Key is the
service group, and Value is the number of records for that group.

1 >>> def get_service_type ( line ) :


2 ... content_name = line . split ( " " ) [5]
3 ... if content_name . endswith ( " . mpd " ) or content_name . endswith ( " . m3u8 " ) :
4 ... return " HLS "
5 ... elif content_name . endswith ( " . dash " ) or content_name . endswith ( " . ts " ) :
6 ... return " MPEG - DASH "
7 ... else :
8 ... return " Web Service "
9 ...
10 >>> c la s s if i e d_ l o g_ d a ta = filtered_data . map ( lambda line : ( get_service_type ( line ) , 1) )
11 >>> service_counts = c la s s if i e d_ l o g_ d a ta . reduceByKey ( lambda a , b : a + b )
12 >>> for service , count in service_counts . collect () :
13 ... print ( f " { service }: { count } records " )
14 ...
15 MPEG - DASH : 826313 records
16 Web Service : 13976 records
17 HLS : 462938 records

Step 2: Print out the list of unique IPs by creating a function to get the IPs in the RDD data.
1 >>> def extract_ip ( line ) :
2 ... ip = line . split ( " " ) [1]
3 ... return ip
4 ...
5 >>> unique_ips = filtered_data . map ( extract_ip ) . distinct ()
6 >>> unique_ips . count ()
7 3952

Step 3: Build a RDD containing the map of IPs


Assume that I have copied the IP Dict.csv from local machine to HDFS.
1 $ hadoop fs - ls / user / S2053478 / test1 /
2 2023 -11 -13 22:00:08 ,188 WARN util . NativeCodeLoader : Unable to load native - hadoop
library for your platform ... using builtin - java classes where applicable
3 Found 2 items
4 -rw -r - -r - - 2 thunguyen supergroup 282042368 2023 -11 -13 21:24 / user / S2053478 / test1 /
FPT -2018 -12 -02. log
5 -rw -r - -r - - 2 thunguyen supergroup 278374 2023 -11 -13 21:59 / user / S2053478 / test1 /
IPDict . csv

Computer Network - Academic year 2021 - 2022 Page 4/5


University of Technology, Ho Chi Minh City
Faculty of Computer Science and Engineering

Step 4: Analyse data


1 >>> ip_data = spark_session . sparkContext . textFile ( " IPDict . csv " )
2 >>> ip_info = ip_data . map ( lambda line : ( line . split ( " ," ) [0] , ( line . split ( " ," ) [1] , line
. split ( " ," ) [2] , line . split ( " ," ) [3]) ) ) . collectAsMap ()
3 >>> ip_broadcast = spark_session . sparkContext . broadcast ( ip_info )
4 >>> def enrich_record ( line ) :
5 ... fields = line . split ( " " )
6 ... ip = fields [1]
7 ... additional_info = ip_broadcast . value . get ( ip , ( " Unknown " , " Unknown " , " Unknown "
))
8 ... latency = float ( fields [0])
9 ... city = additional_info [1]
10 ... content_size = int ( fields [ len ( fields ) - 1])
11 ... return ( ip , additional_info , city , latency , fields [5] , content_size )
12 ...
13 >>> en riched _log_d ata = filtered_data . map ( enrich_record )

Step 5: Print number of records from Ho Chi Minh City:


1 >>> unique_isps = en riche d_log_ data . map ( lambda log : log [1][2]) . distinct () . collect ()
2 >>> print ( f " Number of unique ISPs : { len ( unique_isps ) } " )
3 Number of unique ISPs : 125

1 >>> hcm_records = en riche d_log_ data . filter ( lambda log : log [2] == " Ho Chi Minh City " )
2 >>> print ( f " Number of records from Ho Chi Minh City : { hcm_records . count () } " )
3 Number of records from Ho Chi Minh City : 217212
Listing 5: Number of records from Ho Chi Minh City

1 >>> hanoi_traffic = en riche d_log_ data . filter ( lambda log : log [2] == " Hanoi " ) . map (
lambda log : log [5]) . reduce ( lambda a , b : a + b )
2 >>> print ( f " Total traffic from Hanoi : { hanoi_traffic } " )
3 Total traffic from Hanoi : 204245300091
Listing 6: Total traffic from Hanoi

Step 6: Calculate the latencies’s values.


1 >>> from pyspark . mllib . stat import Statistics
2 >>> from pyspark . mllib . linalg import Vectors
3 >>> latencies = en riched _log_ data . map ( lambda log : log [3])
4 >>> latencies_vector = latencies . map ( lambda latency : Vectors . dense ( latency ) )
5 >>> latency_stats = Statistics . colStats ( latencies_vector )
6 >>> print ( f " Mean Latency : { latency_stats . mean () [0]} " )
7 Mean Latency : 0. 15 163 18 98 35 69 24 2
8 >>> print ( f " Maximum Latency : { latency_stats . max () [0]} " )
9 Maximum Latency : 199.658
10 >>> print ( f " Minimum Latency : { latency_stats . min () [0]} " )
11 Minimum Latency : 0.0
Listing 7: Mean maximum and minimum latencies

Link GitHub: DS Lab2

Computer Network - Academic year 2021 - 2022 Page 5/5

You might also like