Получение java.lang.UnsupportedOperationException: невозможно оценить выражение в Pyspark

В середине моего проекта я застрял с этим Unsupported Operation Exception. Вот мой сценарий, я создал udf под названием filter и зарегистрировал его как fnGetChargeInd. эта функция принимает 4 параметра: временную метку в Юникоде, которая уже отформатирована из запросов, как тип datetime, частота строки, начало строки и период строки. этим он вычисляет chargeAmt и возвращает значение типа Integer. вот мой код функции udf

def filter(startdate, frequency, begmonth, testperiod):
    startdatestring = startdate.strftime("%Y-%m-%d")
    # print "startdatestring->", startdatestring
    startdateyearstring = startdatestring[0:4]
    startdatemonthstring = startdatestring[5:7]
    # print "startdateyearstring->", startdateyearstring
    startdateyearint = int(startdateyearstring)
    startdatemonthint = int(startdatemonthstring)
    # print "startdateyearint is->", startdateyearint
    # print "startdateyearinttype", type(startdateyearint)
    currYear = startdateyearint
    currMonth = startdatemonthint
    currperiod = startdateyearstring + startdatemonthstring
    if (frequency == 'M'):
        return 1
    if (frequency == 'S' or frequency == 'A' and begmonth != None):
        currMonth = int(begmonth)
        print"in if statement", currMonth
    # check nextperiod calculation
    if (currperiod == testperiod):
        return 1
    if (currperiod > testperiod):
        return 0
    if (frequency == 'Q'):
        currMonth = currMonth + 3
    if (frequency == 'S'):
        currMonth = currMonth + 1
    if (currMonth > 12):
        currMonth = currMonth - 12
        currYear = currYear + 1
    return 0

и это мой код TimestampConversion для форматирования Unicode как datetime

def StringtoTimestamp(datetext):
    if(datetext==None):
        return None
    else:
        datevalue = datetime.datetime.strptime(datetext, "%b %d %Y %H:%M:%S:%f%p")
        return datevalue

spark.udf.register('TimestampConvert',lambda datetext:StringtoTimestamp(datetext),TimestampType())

spark.udf.register("fnGetChargeInd",lambda x,y,z,timeperiod:filter(x,y,z,timeperiod),IntegerType())

теперь после этого я запросил таблицу расчета chargeAmt

spark.sql("select b.ENTITYID as ENTITYID, cm.BLDGID as BldgID,cm.LEASID as LeaseID,coalesce(l.SUITID,(select EmptyDefault from EmptyDefault)) as SuiteID,(select CurrDate from CurrDate) as TxnDate,cm.INCCAT as IncomeCat,'??' as SourceCode,(Select CurrPeriod from CurrPeriod)as Period,coalesce(case when cm.DEPARTMENT ='@' then 'null' else cm.DEPARTMENT end, null) as Dept,'Lease' as ActualProjected ,fnGetChargeInd(TimestampConvert(cm.EFFDATE),cm.FRQUENCY,cm.BEGMONTH,('select CurrPeriod from CurrPeriod'))*coalesce (cm.AMOUNT,0) as  ChargeAmt,0 as OpenAmt,cm.CURRCODE as CurrencyCode,case when ('PERIOD.DATACLSD') is null then 'Open' else 'Closed' end as GLClosedStatus,'Unposted'as GLPostedStatus ,'Unpaid' as PaidStatus,cm.FRQUENCY as Frequency,0 as RetroPD from CMRECC cm join BLDG b on cm.BLDGID =b.BLDGID join LEAS l on cm.BLDGID =l.BLDGID and cm.LEASID =l.LEASID and (l.VACATE is null or l.VACATE >= ('select CurrDate from CurrDate')) and (l.EXPIR >= ('select CurrDate from CurrDate') or l.EXPIR < ('select RunDate from RunDate')) left outer join PERIOD on b.ENTITYID =  PERIOD.ENTITYID and ('select CurrPeriod from CurrPeriod')=PERIOD.PERIOD where ('select CurrDate from CurrDate')>=cm.EFFDATE  and (select CurrDate from CurrDate) <= coalesce(cm.EFFDATE,cast(date_add(( select min(cm2.EFFDATE) from CMRECC cm2 where cm2.BLDGID = cm.BLDGID and cm2.LEASID = cm.LEASID and cm2.INCCAT = cm.INCCAT and 'cm2.EFFDATE' > 'cm.EFFDATE'),-1) as timestamp)  ,case when l.EXPIR <(select RunDate from RunDate)then (Select RunDate from RunDate) else l.EXPIR end)").show()

Он идеально рассчитывает chargeAmt  введите описание изображения здесь

Я сохранил этот результат во временной таблице Fact_Temp сейчас ВОЗНИКНОВЕНИЕ ПРОБЛЕМЫ. Я хотел запросить отфильтрованную таблицу, где я получу данные после удаления строки, где ActualProjected = Lease и ChargeAmt = 0

spark.sql("select * from Fact_Temp except(select * from Fact_Temp where ActualProjected='Lease' and ChargeAmt='0')").show()

это дает мне исключение

java.lang.UnsupportedOperationException: невозможно оценить выражение: fnGetChargeInd (TimestampConvert (input [0, string, true]), input [1, string, true], input [2, string, true], выберите CurrPeriod из CurrPeriod)

я понял, что chargeAmt не принимает никакого значения, потому что если я делаю запрос без этого условия, он работает хорошо

spark.sql("select * from Fact_Temp except(select * from Fact_Temp where ActualProjected='Lease')").show()

это дает мне ожидаемый EmptyTable. Логически я думаю, что значение chargeAMt установлено в таблице после расчета, и я зарегистрировал эту таблицу, поэтому значение сохраняется. поэтому, когда я запрашиваю сохраненную таблицу. Я понятия не имею, почему здесь вызывается функция. Я уже видел этот пост в stackoverflow UnsupportedOperationException: Невозможно вычислить выражение: .. при добавлении нового столбца withColumn () и udf () для понимания, но хотя мой случай здесь другой. я пробовал printtschema dataframe, я видел только схему этого соблазнительного

Как я могу решить эту проблему, мы высоко ценим любое руководство. я что-то упустил в моем коде. Пожалуйста, помогите мне. Я использую Pyspark 2.0 заранее спасибо Kalyan

введите здесь описание изображения


person Kalyan    schedule 02.03.2017    source источник


Ответы (1)


хорошо, пока я понял, что это ошибка искры 2.0. эта следующая ссылка решила мою проблему https://issues.apache.org/jira/browse/SPARK-17100

Я перешел с 2.0 на 2.1.0, и у меня это сработало.

person Kalyan    schedule 02.03.2017