שינוי מצטבר מצטבר באפאצ'י ניצוץ ניצוץ



פוסט בבלוג זה דן בתמורות סטטיסטיות בזרם ניצוצות. למד הכל אודות מעקב מצטבר ומיומנויות לקריירת Hadoop Spark.

תורם על ידי Prithviraj Bose

בבלוג הקודם שלי דנתי בתמורות סטטיות באמצעות תפיסת החלונות של אפאצ'י ספארק סטרימינג. אתה יכול לקרוא את זה כאן .





בפוסט זה אעמוד על פעולות מצטברות מצטברות בזרם אפאצ'י ניצוצות. אם אתה חדש בסטרימינג של ניצוצות אז אני ממליץ לך לקרוא את הבלוג הקודם שלי כדי להבין כיצד חלונות עובדים.

סוגי טרנספורמציה אמיתית בזרם ניצוצות (המשך ...)

> מעקב מצטבר

השתמשנו ב- reduceByKeyAndWindow (...) ממשק API למעקב אחר מצבי המפתחות, אולם חלונות מהווים מגבלות למקרי שימוש מסוימים. מה אם נרצה לצבור את מצבי המקשים לאורך ולא להגביל אותו לחלון זמן? במקרה כזה נצטרך להשתמש updateStateByKey (...) אֵשׁ.



ממשק API זה הוצג ב- Spark 1.3.0 והיה פופולרי מאוד. עם זאת ל- API זה תקורה מסוימת של ביצועים, הביצועים שלו מתדרדרים ככל שגודל המדינות גדל עם הזמן. כתבתי דוגמה כדי להראות את השימוש ב- API זה. אתה יכול למצוא את הקוד כאן .

Spark 1.6.0 הציג ממשק API חדש mapWithState (...) אשר פותר את תקורות הביצועים שמציבה updateStateByKey (...) . בבלוג זה אעמוד על ה- API המסוים הזה באמצעות תוכנית דוגמה שכתבתי. אתה יכול למצוא את הקוד כאן .

לפני שאני צולל למעבר קוד, בואו נחסוך כמה מילים במחסום. עבור כל שינוי סטטיסטי, מחסום הוא חובה. נקודת ביקורת היא מנגנון להחזרת מצב המקשים במקרה שתוכנית הנהג נכשלת. כאשר מנהל ההתקן מתחיל מחדש, מצב המקשים משוחזר מקבצי הבידוק. מיקומי מחסום הם בדרך כלל HDFS או Amazon S3 או כל אחסון אמין. במהלך בדיקת הקוד, ניתן לאחסן גם במערכת הקבצים המקומית.



מסגרת היברידית בנהג סלניום

בתוכנית הדוגמה, אנו מאזינים לזרם טקסט שקע במארח = localhost ו- port = 9999. זה מסמל את הזרם הנכנס ל (מילים, מספר הופעות) ועוקב אחר ספירת המילים באמצעות 1.6.0 API mapWithState (...) . בנוסף, מפתחות ללא עדכונים מוסרים באמצעות StateSpec.timeout ממשק API. אנו בודקים ב- HDFS ותדירות הבדיקה היא כל 20 שניות.

בואו ניצור תחילה הפעלה של ניצוצות ניצוצות,

Spark-streaming-session

אנו יוצרים a checkpointDir ב- HDFS ואז קוראים לשיטת האובייקט getOrCreate (...) . ה getOrCreate API בודק את checkpointDir כדי לראות אם יש מצבים קודמים לשחזור, אם זה קיים, אז הוא משחזר את הפעלת הזרמת הניצוץ ומעדכן את מצבי המפתחות מהנתונים המאוחסנים בקבצים לפני שעוברים עם נתונים חדשים. אחרת זה יוצר הפעלה חדשה של הזרמת ניצוצות.

ה getOrCreate לוקח את שם ספריית המחסום ופונקציה (אותה שמנו createFunc ) שחתימתו צריכה להיות () => StreamingContext .

בואו נבחן את הקוד בפנים createFunc .

שורה מס '2: אנו יוצרים הקשר זורם עם שם התפקיד ל- 'TestMapWithStateJob' ומרווח אצווה = 5 שניות.

שורה 5: הגדר את ספריית המחסום.

שורה מס '8: הגדר את מפרט המצב באמצעות המחלקה org.apache.streaming.StateSpec לְהִתְנַגֵד. ראשית הגדרנו את הפונקציה שתעקוב אחר המצב ואז הגדרנו את מספר המחיצות עבור ה- DStreams שנוצרו שייווצרו במהלך טרנספורמציות עוקבות. לבסוף הגדרנו את פסק הזמן (ל- 30 שניות), אם לא יתקבל עדכון כלשהו למפתח תוך 30 שניות, אז מצב המפתח יוסר.

קו 12 #: הגדר את זרם השקע, שיטח את נתוני האצווה הנכנסים, צור זוג ערך מפתח, התקשר mapWithState , הגדירו את מרווח הבדיקה ל- 20s ולבסוף הדפיסו את התוצאות.

מסגרת הניצוץ מכנה את ה e createFunc עבור כל מפתח עם הערך הקודם והמצב הנוכחי. אנו מחשבים את הסכום ומעדכנים את המצב בסכום המצטבר ולבסוף אנו מחזירים את הסכום עבור המפתח.

מקורות Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

יש לך שאלה עבורנו? אנא הזכיר זאת בסעיף ההערות ונחזור אליך.

פוסטים קשורים:

התחל לעבוד עם Apache Spark & ​​Scala

טרנספורמציות סטטיסטיות עם חלילה בזרם ניצוצות