RDD באמצעות Spark: אבן הבניין של Apache Spark



בלוג זה בנושא RDD באמצעות Spark יספק לך ידע מפורט ומקיף על RDD, שהוא היחידה הבסיסית של Spark & ​​כמה הוא שימושי.

, המילה עצמה מספיקה כדי ליצור ניצוץ במוחו של כל מהנדס Hadoop. ל n בזיכרון כלי עיבוד המהיר ברק במחשוב אשכולות. בהשוואה ל- MapReduce, שיתוף הנתונים בזיכרון הופך RDDs 10-100x מהר יותר מאשר שיתוף רשת ודיסק וכל זה אפשרי בגלל RDDs (ערכות נתונים מבוזרות Resilient). נקודות המפתח שאנו מתמקדים בהן היום ב- RDD באמצעות מאמר Spark הן:

צורך ב- RDD?

מדוע אנו זקוקים ל- RDD? -RDD באמצעות Spark





העולם מתפתח עם ו מדע נתונים בגלל ההתקדמות ב . אלגוריתמים מבוסס על נְסִיגָה , , ו שנמשך מופץ מחשב איטרטיבי אטיון אופנה הכוללת שימוש חוזר ושיתוף נתונים בין יחידות מחשוב מרובות.

המסורתי טכניקות זקוקות לאחסון יציב ביניים ומופץ כמו HDFS הכוללת חישובים חוזרים עם שכפול נתונים וסידור נתונים, שהפכו את התהליך לאיטי הרבה יותר. מעולם לא היה קל למצוא פיתרון.



זה איפה RDDs (מערכי נתונים מבוזרים גמישים) מגיע לתמונה הגדולה.

RDD הם קלים לשימוש וקל ליצור אותם כאשר הנתונים מיובאים ממקורות נתונים והושמטו ל- RDD. יתר על כן, הפעולות מיושמות לעיבודן. הם א אוסף זיכרון מבוזר עם הרשאות כ לקריאה בלבד והכי חשוב, הם כן עמיד בפני תקלות .



אם בכלל מחיצת נתונים שֶׁל ה- RDD הוא אָבֵד , ניתן לחדש אותו על ידי יישום אותו טרנספורמציה המבצע על אותה מחיצה אבודה ב יוּחֲסִין במקום לעבד את כל הנתונים מאפס. סוג זה של גישה בתרחישים בזמן אמת יכול לגרום לנסים לקרות במצבים של אובדן נתונים או כאשר מערכת אינה פועלת.

מהם RDDs?

RDD או ( מערך נתונים מבוזר גמיש ) הוא יסוד מבנה נתונים בספארק. התנאי מִתאוֹשֵׁשׁ מַהֵר מגדיר את היכולת המייצרת את הנתונים באופן אוטומטי או נתונים מתגלגל לאחור אל ה מצב מקורי כאשר מתרחש אסון לא צפוי עם סבירות לאובדן נתונים.

הנתונים שנכתבו ל- RDD הם מחולק ומאוחסן ב מספר צמתים ניתנים להפעלה . אם צומת ביצוע נכשל בזמן הריצה, ואז זה מיד מגביר את הגיבוי הצומת ההפעלה הבא . זו הסיבה ש RDD נחשבים לסוג מתקדם של מבני נתונים בהשוואה למבני נתונים מסורתיים אחרים. RDDs יכולים לאחסן נתונים מובנים, לא מובנים וחצי מובנים.

בואו להתקדם עם ה- RDD שלנו באמצעות הבלוג Spark ונלמד על התכונות הייחודיות של RDD שמקנה לו יתרון על פני סוגים אחרים של מבני נתונים.

תכונות של RDD

  • לזכר (RAM) חישובים : הרעיון של חישוב בזיכרון לוקח את עיבוד הנתונים לשלב מהיר ויעיל יותר שבו הכללי ביצועים של המערכת היא משודרג.
  • ל הערכתו : המונח הערכה עצלנית אומר טרנספורמציות מוחלים על הנתונים ב- RDD, אך הפלט לא נוצר. במקום זאת, התמורות המיושמות הן מחובר.
  • הַתמָדָה : RDDs כתוצאה מכך הם תמיד לשימוש חוזר.
  • פעולות גסות : המשתמש יכול להחיל טרנספורמציות על כל האלמנטים בערכות הנתונים דרך מַפָּה, לְסַנֵן אוֹ קבוצה לפי פעולות.
  • עמיד בפני תקלות : אם יש אובדן נתונים, המערכת יכולה גלגל לאחור שלה מצב מקורי באמצעות המחובר טרנספורמציות .
  • חוסר שינוי : לא ניתן יהיה להגדיר, לאחזר או ליצור נתונים השתנה ברגע שהוא מחובר למערכת. אם אתה זקוק לגישה ולשינוי ה- RDD הקיים, עליך ליצור RDD חדש על ידי החלת קבוצה של טרנספורמציה מתפקד על RDD הנוכחי או הקודם.
  • מחיצה : זה יחידה מכריעה של מקבילות בספארק RDD. כברירת מחדל, מספר המחיצות שנוצרו מבוסס על מקור הנתונים שלך. אתה יכול אפילו להחליט על מספר המחיצות שברצונך לבצע באמצעות מחיצה מותאמת אישית פונקציות.

יצירת RDD באמצעות Spark

ניתן ליצור RDD ב שלוש דרכים:

  1. קריאת נתונים מ אוספים מקבילים
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. מגיש בקשה טרנספורמציה על RDD קודמים
מילים val = spark.sparkContext.parallelize (Seq ('ניצוץ', 'הוא', 'a', 'מאוד', 'חזק', 'שפה')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. קריאת נתונים מ אחסון חיצוני או נתיבי קבצים כמו HDFS אוֹ HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

פעולות המבוצעות על RDD:

ישנם בעיקר שני סוגים של פעולות המבוצעות על RDD, כלומר:

  • טרנספורמציות
  • פעולות

טרנספורמציות : ה פעולות אנו מגישים בקשות ל- RDD ל- מסנן, גישה ו לְשַׁנוֹת הנתונים ב- RDD האב להפקת א RDD עוקב נקרא טרנספורמציה . ה- RDD החדש מחזיר מצביע ל- RDD הקודם ומבטיח את התלות ביניהם.

טרנספורמציות הן הערכות עצלות, במילים אחרות, הפעולות המופעלות על ה- RDD שאתה עובד יירשמו אך לא יצא לפועל. המערכת זורקת תוצאה או חריגה לאחר הפעלת ה- פעולה .

אנו יכולים לחלק טרנספורמציות לשני סוגים להלן:

  • טרנספורמציות צרות
  • טרנספורמציות רחבות

טרנספורמציות צרות אנו מיישמים טרנספורמציות צרות על א מחיצה אחת של ה- RDD האב ליצירת RDD חדש שכן הנתונים הנדרשים לעיבוד ה- RDD זמינים במחיצה אחת של ה- הורה ASD . הדוגמאות לתמורות צרות הן:

  • מַפָּה()
  • לְסַנֵן()
  • flatMap ()
  • חֲלוּקָה()
  • mapPartitions ()

טרנספורמציות רחבות: אנו מיישמים את השינוי הרחב על מחיצות מרובות כדי ליצור RDD חדש. הנתונים הנדרשים לעיבוד ה- RDD זמינים במחיצות מרובות של ה- הורה ASD . הדוגמאות לתמורות רחבות הן:

  • להפחית על ידי ()
  • הִתאַחֲדוּת()

פעולות : פעולות מנחות את Apache Spark להגיש בקשה חישוב ולהעביר את התוצאה או חריג בחזרה ל- RDD של הנהג. מעטים מהפעולות כוללות:

מבנה של תוכנית Java
  • לאסוף()
  • לספור()
  • לקחת()
  • ראשון()

הבה נשתמש באופן מעשי בפעולות על RDD:

IPL (ליגת העל ההודית) הוא טורניר קריקט עם המותן שלו ברמת שיא. אז, מאפשר לנו היום לשים את ידנו על מערך הנתונים של ה- IPL ולבצע את ה- RDD שלנו באמצעות Spark.

  • קוֹדֶם כֹּל, בואו להוריד נתוני התאמת CSV של IPL. לאחר הורדתו, הוא מתחיל להיראות כקובץ EXCEL עם שורות ועמודות.

בשלב הבא אנו מפעילים את הניצוץ וטוענים את קובץ matches.csv ממיקומו, במקרה שלי שליcsvמיקום הקובץ הוא “/ משתמש / edureka_566977/test/matches.csv”

עכשיו בואו נתחיל עם טרנספורמציה חלק ראשון:

  • מַפָּה():

אנו משתמשים שינוי טרנספורמציה ליישם פעולת טרנספורמציה ספציפית על כל אלמנט של RDD. כאן אנו יוצרים RDD בשם CKfile שם מאחסן את שלנוcsvקוֹבֶץ. ניצור RDD נוסף שנקרא States to לאחסן את פרטי העיר .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • לְסַנֵן():

שינוי מסנן, השם עצמו מתאר את השימוש בו. אנו משתמשים בפעולת טרנספורמציה זו כדי לסנן את הנתונים הסלקטיביים מתוך אוסף נתונים שניתן. אנו פונים פעולת סינון כאן כדי לקבל את הרשומות של משחקי ה- IPL של השנה 2017 ולאחסן אותו ב- RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

אנו מיישמים flatMap היא פעולת טרנספורמציה לכל אחד מהאלמנטים של RDD ליצירת RDD חדש. זה דומה לשינוי מפה. כאן אנו פוניםFlatmapל לירוק את הגפרורים של העיר היידראבאד ולאחסן את הנתוניםfilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('היידראבאד')). collect ()

  • חֲלוּקָה():

כל נתונים שאנו כותבים ל- RDD מחולקים למספר מסוים של מחיצות. אנו משתמשים בשינוי זה כדי למצוא את מספר מחיצות הנתונים למעשה מפוצלים ל.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

אנו רואים MapPatitions כחלופה של Map () ו-לכל אחד() ביחד. אנו משתמשים כאן ב- MapPartitions כדי למצוא את ה- מספר שורות יש לנו ב- RDD שלנו.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • להפחית על ידי ():

אנו משתמשיםלהפחית על ידי() מופעל זוגות ערך ערך . השתמשנו בשינוי זה על שלנוcsvקובץ כדי למצוא את הנגן עם האיש הגבוה ביותר של המשחקים .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • הִתאַחֲדוּת():

השם מסביר הכל, אנו משתמשים בשינוי האיחוד הוא כדי מועדון שני RDD יחד . כאן אנו יוצרים שני RDD, כלומר fil ו- fil2. fil RDD מכיל את הרשומות של התאמות IPL לשנת 2017 ו- fil2 RDD מכיל את רשומת התאמות IPL לשנת 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

בואו נתחיל עם ה- פעולה חלק שבו אנו מציגים תפוקה בפועל:

  • לאסוף():

איסוף היא הפעולה בה אנו משתמשים להציג את התוכן ב- RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • לספור():

לספורהיא פעולה בה אנו משתמשים כדי לספור את מספר רשומות נוכח ב- RDD.כאןאנו משתמשים בפעולה זו כדי לספור את מספר הרשומות הכולל בקובץ matches.csv שלנו.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • לקחת():

Take היא פעולת פעולה הדומה לאיסוף, אך ההבדל היחיד הוא שהיא יכולה להדפיס כל שהיא מספר שורות סלקטיבי לפי בקשת המשתמש. כאן אנו מיישמים את הקוד הבא להדפסת ה- עשרת הדוחות המובילים ביותר.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. קח (10) .foreach (println)

  • ראשון():

ראשית () היא פעולת פעולה הדומה לאיסוף () ולקחת ()זהנהג להדפיס את הדוח העליון ביותר של הפלט כאן אנו משתמשים בפעולה הראשונה () כדי למצוא את המספר המרבי של משחקים ששוחקו בעיר מסוימת ואנחנו מקבלים את מומבאי כמוצא.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y). map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

כדי להפוך את התהליך שלנו ללימודי RDD שלנו באמצעות Spark, עוד יותר מעניין, הבאתי מקרה שימוש מעניין.

RDD באמצעות Spark: מקרה שימוש בפוקימון

  • קוֹדֶם כֹּל, תן לנו להוריד קובץ Pokemon.csv ולהעמיס אותו על מעטפת הניצוץ כפי שעשינו לקובץ Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

פוקימונים זמינים למעשה במגוון גדול, בואו נמצא כמה זנים.

  • הסרת סכמה מקובץ Pokemon.csv

אולי לא נצטרך את סכֵימָה של קובץ Pokemon.csv. לפיכך, אנו מסירים אותו.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • מציאת מספר מחיצות הפוקימון שלנו. csv מופץ ב.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • פוקימון מים

למצוא את מספר פוקימונים של מים

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • פוקימון אש

למצוא את מספר פוקימונים אש

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • אנחנו יכולים גם לזהות את אוּכְלוֹסִיָה מסוג פוקימונים מסוג אחר המשתמש בפונקציית הספירה
WaterRDD.count () FireRDD.count ()

  • מאז אני אוהב את המשחק של אסטרטגיה הגנתית בואו נמצא את הפוקימון עם הגנה מרבית.
val defenceList = NoHeader.map {x => x.split (',')}. מפה {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • אנחנו יודעים את המקסימום ערך כוח הגנה אבל אנחנו לא יודעים באיזה פוקימון מדובר. אז בואו נמצא מה זה פוקימון.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. מפה {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (מזמין [כפול]. Reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • עכשיו בואו נסתדר עם הפוקימון הכי פחות הגנה
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • עכשיו בואו נראה את הפוקימון עם אסטרטגיה פחות הגנתית.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPeader .מפה {x => x.split (',')}. מפה {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (הזמנה [כפול ]. על (_._ 1)) MinDefencePokemon2.foreach (println)

אז, עם זה, אנו מגיעים לסוף RDD זה באמצעות מאמר Spark. אני מקווה שעוררנו מעט אור על הידע שלך אודות RDD, תכונותיהם וסוגים שונים של פעולות שניתן לבצע בהם.

מאמר זה מבוסס על נועד להכין אותך לקראת בחינת ההסמכה של Cloudera Hadoop ו- Spark Developer (CCA175). תקבל ידע מעמיק על אפאצ'י ספארק והמערכת האקולוגית של Spark, הכוללת Spark RDD, Spark SQL, Spark MLlib ו- Spark Streaming. תקבל ידע מקיף על שפת תכנות Scala, HDFS, Sqoop, Flume, Spark GraphX ​​ומערכת העברת הודעות כגון Kafka.