UDF חזיר אפאצ'י: חלק 1 - פונקציות הערכה, צבירה וסינון



פוסט זה מתאר אודות Apache חזיר UDF - פונקציות Eval, Aggregate & Filter. התבונן בפונקציות Eval, Aggregate & Filter.

Apache Pig מספק תמיכה נרחבת בפונקציות המוגדרות על ידי המשתמש (UDF) כדרך לציין עיבוד מותאם אישית. כרגע ניתן להפעיל UDFs חזיר בשלוש שפות: Java, Python, JavaScript ו- Ruby. התמיכה הנרחבת ביותר ניתנת לפונקציות Java.





ניתן להפעיל Java UDF באמצעות מספר דרכים. ה- UDF הפשוט ביותר יכול פשוט להרחיב את EvalFunc, מה שדורש ליישם את פונקציית ה- exec בלבד. כל UDF של Eval חייב ליישם זאת. בנוסף, אם פונקציה היא אלגברית, היא יכולה ליישם ממשק אלגברי כדי לשפר משמעותית את ביצועי השאילתות.

חשיבות UDF בחזיר:

חזיר מאפשר למשתמשים לשלב מפעילים קיימים עם קוד משלהם או של קוד אחר באמצעות UDF. היתרון של חזיר הוא ביכולתו לאפשר למשתמשים לשלב את המפעילים שלה עם הקוד שלהם או של אחרים באמצעות UDF. עד גרסה 0.7, כל ה- UDF חייבים להיות כתובים ב- Java ומיושמים כשיעורי Java. זה מקל על הוספת UDFs לחזיר על ידי כתיבת מחלקת Java והודעת חזיר על קובץ ה- JAR.



חזיר עצמו מגיע עם כמה UDFs. לפני גרסה 0.8, זו הייתה מערכת מוגבלת מאוד עם פונקציות הצבירה הסטנדרטיות של SQL בלבד וכמה אחרות. ב- 0.8 נוסף מספר גדול של UDFs סטנדרטיים לעיבוד מחרוזות, מתמטיקה וסוג מורכב.

מה זה פיגי בנק?

Piggybank הוא אוסף של UDFs שתורמו על ידי המשתמש אשר משתחרר יחד עם Pig. UDFs של Piggybank אינם כלולים ב- PAR JAR, לכן עליכם לרשום אותם באופן ידני בסקריפט שלכם. אתה יכול גם לכתוב UDF משלך או להשתמש בכאלה שנכתבו על ידי משתמשים אחרים.

פונקציות הערכה

מחלקת UDF מרחיבה את מחלקת EvalFunc המהווה בסיס לכל פונקציות ה- Eval. כל פונקציות ההערכה מרחיבות את מחלקת Java 'org.apache.pig.EvalFunc. 'זה פרמטר עם סוג ההחזרה של ה- UDF שהוא מחרוזת Java במקרה זה. שיטת הליבה במחלקה זו היא 'exec.' השורה הראשונה של הקוד מציינת שהפונקציה היא חלק מחבילת myudfs.



זה לוקח רשומה אחת ומחזיר תוצאה אחת, שתופעל עבור כל רשומה שעוברת בצינור הביצוע. זה לוקח tuple, המכיל את כל השדות שהתסריט מעביר ל- UDF שלך כקלט. לאחר מכן הוא מחזיר את הסוג שבו פרמטת את EvalFunc.

פונקציה זו מופעלת בכל כפולת קלט. הקלט לפונקציה הוא תפל עם פרמטרי קלט לפי סדר העברתם לפונקציה בסקריפט החזיר. בדוגמה המוצגת למטה, הפונקציה לוקחת מחרוזת כקלט. הפונקציה הבאה ממירה את המחרוזת מקט באותיות רישיות. כעת, לאחר שהפונקציה מיושמת, היא צריכה להיאסף ולהיכלל ב- JAR.

מהו אירוע JavaScript
חבילה myudfs יבוא java.io.IOException ייבוא ​​org.apache.pig.EvalFunc ייבוא ​​org.apache.pig.data.Tuple מחלקה ציבורית UPPER מרחיב EvalFunc {ציבורי מחרוזת exec (קלט Tuple) זורק IOException {אם (קלט == null || input.size () == 0) להחזיר null לנסות {String str = (String) input.get (0) return str.toUpperCase ()} לתפוס (חריג e) {לזרוק IOException חדש ('נתפס שורת קלט לעיבוד חריג', ה)}}}

פונקציות מצטברות:

פונקציות מצטברות הן סוג נפוץ אחר של פונקציית Eval. פונקציות מצטברות מוחלות בדרך כלל על נתונים מקובצים. הפונקציה Aggregate לוקחת תיק ומחזירה ערך סקלרי. מאפיין מעניין ורב ערך של פונקציות מצטברות רבות הוא שניתן לחשב אותן באופן הדרגתי באופן מבוזר. בעולם Hadoop המשמעות היא שהחישובים החלקיים יכולים להתבצע על ידי Map ו- Combiner ואת התוצאה הסופית ניתן לחשב על ידי Reducer.

חשוב מאוד לוודא כי פונקציות אגרגטיות שהן אלגבריות מיושמות ככאלה. דוגמאות מסוג זה כוללות את COUNT, MIN, MAX ו- AVERAGE המובנים.

לספור היא דוגמה לפונקציה אלגברית בה אנו יכולים לספור את מספר האלמנטים בתת-קבוצה של הנתונים ואז לסכם את הספירות כדי לייצר פלט סופי. בואו נסתכל על יישום הפונקציה COUNT:

class public COUNT מרחיב EvalFunc מיישם אלגברי {public long exec (קלט Tuple) זורק IOException {return count (input)} public String getInitial () {return Initial.class.getName ()} public String getIntermed () {return Intermed.class. getName ()} ציבורי מחרוזת getFinal () {return Final.class.getName ()} מחלקה ציבורית סטטית ראשונית מרחיבה EvalFunc {public Tuple exec (קלט Tuple) זורק IOException {return TupleFactory.getInstance (). newTuple (ספירה (קלט)) }} מחלקה ציבורית סטטית Intermed מרחיבה EvalFunc {public Tuple exec (קלט Tuple) זורק IOException {return TupleFactory.getInstance (). newTuple (סכום (קלט))}} מחלקה ציבורית סטטית Final extends EvalFunc {public Tuple exec (קלט Tuple) זורק IOException {החזר סכום (קלט)}} מוגן סטטי ספירה ארוכה (קלט Tuple) זורק ExecException {ערכי אובייקט = input.get (0) אם (ערכי מופע של DataBag) מחזירים ((DataBag) ערכים). גודל () אחר אם (ערכים מופע של מפה) להחזיר ערכים חדשים ארוכים (((מפה)) .גודל ())} סטטי מוגן סטטי (Tuple i nput) זורק ExecException, NumberFormatException {DataBag ערכים = (DataBag) input.get (0) סכום ארוך = 0 עבור (Iterator (Tuple) it = values.iterator () it.hasNext ()) {Tuple t = it.next ( ) סכום + = (ארוך) t.get (0)} סכום החזר}}

COUNT מיישם ממשק אלגברי שנראה כך:

ממשק ציבורי אלגברי {מחרוזת ציבורית getInitial () מחרוזת ציבורית getIntermed () מחרוזת ציבורית getFinal ()}

כדי שהפונקציה תהיה אלגברית, עליה ליישם ממשק אלגברי המורכב מהגדרה של שלוש מחלקות שמקורן ב- EvalFunc. החוזה הוא שהפעולה האקטיבית של המחלקה הראשונית נקראת פעם אחת ומועברת לטפל הקלט המקורי. תפוקתו היא טופל המכיל תוצאות חלקיות. ניתן לכנות את פונקציית ה- exec של מחלקת ה- Intermed פעמים או יותר ולוקחת כקולתה tuple המכיל תוצאות חלקיות המיוצרות על ידי המחלקה הראשונית או על ידי קריאות קודמות של מחלקת ה- Intermed ומייצר tuple עם תוצאה חלקית אחרת. לבסוף, פונקציית ה- exec של המחלקה Final נקראת ונותנת את התוצאה הסופית כסוג סקלרי.

פונקציות סינון:

פונקציות סינון הן פונקציות Eval שמחזירות ערך בוליאני. ניתן להשתמש בו בכל מקום בו מתאים ביטוי בוליאני, כולל אופרטור FILTER או ביטוי Bincond. אפאצ'י חזיר אינו תומך בבוליאנית לחלוטין, ולכן פונקציות סינון אינן יכולות להופיע בהצהרות כגון 'Foreach', שם התוצאות מופיעות למפעיל אחר. עם זאת, ניתן להשתמש בפונקציות סינון בהצהרות סינון.

הדוגמה שלהלן מיישמת את הפונקציה IsEmpty:

ייבא java.io.IOException ייבא java.util.Map ייבא org.apache.pig.FilterFunc יבוא org.apache.pig.PigException ייבא org.apache.pig.backend.executionengine.ExecException ייבא org.apache.pig.data.DataBag יבוא org.apache.pig.data.Tuple יבוא org.apache.pig.data.DataType / ** * קבע אם תיק או מפה ריקים. * / class class IsEmpty מרחיב את FilterFunc {@Override exec בוליאני ציבורי (קלט Tuple) זורק IOException {נסה {Object values ​​= input.get (0) if (values ​​instanceof DataBag) return ((DataBag) ערכים). size () == 0 אחר אם (ערכי מופע של מפה) מחזירים ((מפה) ערכים). גודל () == 0 אחר {int errCode = 2102 מחרוזת msg = 'לא יכול לבדוק' + DataType.findTypeName (ערכים) + 'לריקנות.' לזרוק ExecException חדש (msg, errCode, PigException.BUG)}} לתפוס (ExecException ee) {לזרוק ee}}}