Većina Spark transformacija i neke od akcija očekuju da se kao argument prosledi f-ja koju Spark koristi za preračunavanje. Postoje tri načina na koja mogu da se proslede f-je Sparku.
Umesto prosledjivanja argumenata sa referencom na polje potrebno je proslediti lokalnu varijablu sa potrebnim objektom
Spark RDD koristi lenju evaluaciju i nekada ima potrebe da se koristi isti RDD više puta. Ako koristimo isti RDD više puta, predefinisano ponašanje je takvo da Spark za svaku akciju preračunava RDD. Ovo može biti veoma skupo, naručito za iterativne algoritme. Trivijalni primer:
Da bi izbegli preračunavanje RDD-a više puta možemo da zamolimo Spark da kešira podatke. U tom slučaju čvorovi koji preračunavaju RDD skladište njegove delove. Ako čvor koji ima keširane podatke otakže, Spark će te delove podataka ponovo preračunati kada budu bili potrebni. Takođe, moguće je skladištenje podataka sa redudansom, na više čvorova, ako je potrebno da se upravlja otkazom čvorova bez usporavanja. Postoje više nivoa keširanja podataka. Podatke je moguće čuvati na heap-u JVM kao serijalizovane ili neserijalizovane objekte. Takođe, moguće je čuvati i podatke na disku. Podaci na disku se uvek čuvaju serijalizovano.
Nivo | Upotrebljnost prostora | CPU vreme | U memoriji | Na disku |
---|---|---|---|---|
MEMORY_ONLY | Visoka | Niska | Da | Ne |
MEMORY_ONLY_SER | Niska | Visoka | Da | Ne |
MEMORY_AND_DISK | Visoka | Srednja | Neki | Neki |
MEMORY_AND_DISK_SER | Niska | Visoka | Neki | Neki |
DISK_ONLY | Niska | Visoka | Ne | Da |
Potrebno je pozvati persist() pre prve akcije. Ako probamo da keširamo više podataka nego što može da stane u memoriju, Spark će automatski izbaciti najstarije particije Least Recently Used (LRU) algoritmom. Za memory_only nivoe izbačene particije će preračunati kada one budu bile potrebne, a za ostale nivoe izbačene particije će biti zapamćene na disk.
RDD ima i metod unpersist()
` kojim se ručno uklanja RDD iz memorije.
Spark ima specijalne operacije nad RDD-ovima koji sadrže ključ vredsnot parove. Ovi RDD-ovi se zovu pair RDD. Pair RDD su korisni zato što mogu da se koriste operacije koje se izvode nad ključevima paralelno ili regrupišu podatke preko mreže.
Postoji više načina za kreiranje pair RDD. Mnogi formati fajlova direktno vraćaju pair RDD. U ostalim slučajevima možemo da pretvorima regularni u pair RDD. To se postiže korišćenjem f-je map() koja kao rezultat vraća key/values parove.
Transformacije nad jednim RDD-om | |
---|---|
reduceByKey(func) | Kombinuje vrednosti sa istim ključem |
groupByKey() | Grupiše vrednosti sa istim ključem |
combineByKey(createCombiner, mergeValues, mergeCombiners, partitioner) | Kombinuje vrednosti sa istim ključem koristeći različite tipove rezultata |
mapValues(func) | Primenjuje func nad svakom vrednošću unutar RDD-a. |
flatMapValues(func) | Slično kao i mapValues, ali mapira jedan original u 0 ili više vrednosti |
keys() | Vraća RDD koji se sastoji samo od ključeva |
values() | Vraća RDD koji ima samo vrednosti |
sortByKey() | Sortira RDD po ključu. |
Transformacije nad dva RDD-a | |
---|---|
subtractByKey(drugiRDD) | Uklanja elemente sa ključem koji postoji u drugom RDD-u |
join(drugiRDD) | Unutrašnje spajanje dva RDD-a |
rightOuterJoin(drugiRdd) | Desno spajanje dva RDD-a |
leftOuterJoin(drugiRdd) | Levo spajanje dva RDD-a |
cogroup(drugiRdd) | Grupiše podatke koj dele isti ključ iz oba RDD-a |
Pair RDD je izvedena klasa iz RDD tako da podržava iste f-je kao i RDD.
Kada je skup podataka opisan kao ključ/vrednost parovi, najčešće se vrši spajanje svih elemenata sa istim ključem. Akcije reduce(), fold() i aggregate() običnog RDD imaju slične transformacije nad pair RDD-om. Ove f-je kao povratnu vrednost imaju RDD pa se zato zovu transformacije, a ne akcije. F-ja reduceByKey() je slična f-ji reduce() obe uzimaju funkciju i koriste je da bi kombinovale vrednosti. reduceByKey() izvršava više paralelnih operacija redukcije, jednu za svaki ključ u skupu podataka, gde svaka operacija kombinuje vrednosti sa istim ključem. Skup podataka može da ima veliki broj ključeva, pa reduceByKey nije implementirana kao akcija koja vraća rezultat u driver program. Umesto toga, reduceByKey() je transformacija koja kreira novi RDD koji se sastoji od ključa i redukovanih vrednosti za taj ključ.
combineByKey() je najopštija ključ/vrednost agregacija. Kao i aggregate(), combineByKey() dozvoljava povratne vrednosti koje nisu istog tipa kao i ulazni podaci.
Ako su podaci već grupisani na način koji je potreban groupByKey() će grupisati podatke po ključu u RDD-u. Ako se RDD sastojati od ključeva tipa K i vrednosti tipa V nakon groupByKey() RDD će se biti sledećeg tipa [K, Iterable[V]]
countByKey() | Koliko elemeneta ima sa istim ključem |
collectAsMap() | Skuplja podatke kao map-u radi lakšeg pristupa |
lookup(key) | Sve vrednosti koji imaju ključ key |
Napraviti Spark skriptu koji broji koliko puta se svaka reč pojavljuje u fajlu README.md.
Na osnovu podataka finansijskog prometa između kompanija (transfer novca među njima) potrebno je izvršiti kategorizaciju kompanija. U 4 tekstualne datoteke kvartal1.txt, … , kvartal4.txt se u svakoj liniji nalazi transfer novca. Transfer je dat u formatu:
pibuplatilac pibprimalac iznostransfer
gde pib-ovi identifikuju kompanije (poreski identifikacioni broj). U datoteci kategorizacija.txt nalaze se podaci prema kojima se kompanija kategorizuje u formatu:
iznoskat1 . . . iznoskat8
gde je broj kategorizacionih granica 8, definisanih u vrednostima iznoskati, i =1 .. 8.
Ukoliko je iznos transfera 12345.67 , a kategorizacione granice iznoskat3 i iznoskat4 iznose redom 10240.12 i 14567.89 , tada takav transfer za obe kompanije govori da imaju transfer u kategoriji 3 . Ukoliko je iznos transfera manji od iznoskat1, transfer se ne uzima u obzir u kategorizaciji. Za transfer veći od iznoskat8 , uzimamo da obe kompanije imaju transfer u kategoriji 8.
Potrebno je za svaku kompaniju utvrditi koliko je imala transakcija u svakoj kategoriji.
Napomena : Svi iznosi transfera su brojevi u decimalnom zapisu, a pib -ovi su prirodni brojevi.
Generator fajlova kvartal*.txt
Fajl kategorizacija.txt