RDD je veliki skup podataka. Te kolekcije podataka su često toliko velike da ne mogu da stanu na jednom čvoru. U takvim slučajevima podaci moraju da budu podeljeni na više čvorova. Spark automatski partiocioniše podatke po čvorovima.
Osobina | Opis |
---|---|
RDD.getNumPartitions() | Broj particija |
RDD.partitioner | Ako postoji vraća partitioner (HashPartitioner, RangePartitioner, CustomPartitioner) |
sc.defaultParallelism | Defoltni nivo paralelizma definisan u SparkContext |
Spark koristi partiotioner kao algoritam za utvrđivanje na kom čvoru se nalazi specifični podatak. Ako partitioner ima vrednost NONE onda podaci nisu klasifikovani na osnovu karakteristika podataka, već su nasumično distribuirani i uniformno raspoređeni po čvorovima.
Generički RDD
Api poziv | Veličina particije u rezultujućem RDD-u | Koji partitioner je korišćen |
---|---|---|
sc.parallelize() | sc.defaultParallelism | NONE |
sc.textFile() | sc.defaultParallelism ili broj blokova fajla, šta god je veće | NONE |
filter(), map(), flatMap(), distinct() | Isto kao i roditelj RDD | NONE, osim filter koji koristi partitioner roditeljskog RDD-a |
rdd.union(otherRDD) | rdd.partiotions.size+otherRDD.partitions.size | NONE |
rdd.intersection(otherRDD) | max(rdd.partiotions.size,otherRDD.partitions.size) | NONE |
rdd.subtract(otherRDD) | rdd.partiotions.size | NONE |
rdd.cartesian(otherRDD) | rdd.partiotions.size*otherRDD.partitions.size | NONE |
pair RDD
Api poziv | Veličina particije u rezultujućem RDD-u | Koji partitioner je korišćen |
---|---|---|
reduceByKey(), foldByKey(), combineByKey(), groupByKey() | Isto kao i roditelj RDD | HashPartitioner |
sortByKey() | Isto kao i roditelj RDD | RangePartitioner |
mapValues(),flatMapValues() | Isto kao i roditelj RDD | Roditeljski RDD partitioner |
cogroup(), join(), leftOuterJoin(), rightOuterJoin() | Zavisi od osobina RDD-a | HashPartitioner |
Neka je dat fajl korisnici.txt koji sadrži informacije o korisnicima u sledećem formatu (UserID, UserInfo), gde UserID predstavlja jedinstveni identifikator korisnika, a UserInfo predstavlja listu kategorija na koje je korisnik pretplaćen. Aplikacija periodično kombinuje ove podatke sa manjim fajlom koji reprezentuje događaje koji su se dogodili u poslednjih pet minuta. Manji fajlovi se zovu ulaz1.txt, ulaz2.txt, … Podaci su oblika (UserID, LinkInfo) - UserID korisnika koji je kliknuo na link iz kategorije LinkInfo.
Za svaki manji fajl pojedinačno je potrebno prebrojati koliko korisnika je posetilo link (LinkInfo) koji nije u nekoj od kategorija na koje je korisnik pretplaćen (UserInfo).
Predlog generatora podataka
Predlog rešenja
Prethodni kod daje dobar rezultat, ali je neefikasan. Operacija join(), koja se poziva svaki put kada se pozove processNewLogs() ne zna ništa o tome kako su raspoređene particije. Ova operacija će prerasporediti ključeve iz oba RDD preko mreže tako da isti ključevi budu na jednom čvoru. Pošto očekujemo da će RDD userData da sadrži mnogo više podataka u odnosu na events RDD ovo je nepotrebno slanje podataka preko mreže.
Rešenje je jednostavno. Samo treba upotrebiti partitionBy() transformaciju nad userData. Na taj način stavljamo Sparku do znanja da ima hash partitioner na jednom od rdd-ova i da pri join() treba da to ima u vidu.
Napisati pyspark skriptu koja aproksimira vrednosti broja primenom Monte Carlo metode.
Predlog rešenja iz pyspark-shell