В середине моего проекта я застрял с этим Unsupported Operation Exception. Вот мой сценарий, я создал udf под названием filter и зарегистрировал его как fnGetChargeInd. эта функция принимает 4 параметра: временную метку в Юникоде, которая уже отформатирована из запросов, как тип datetime, частота строки, начало строки и период строки. этим он вычисляет chargeAmt и возвращает значение типа Integer. вот мой код функции udf
def filter(startdate, frequency, begmonth, testperiod):
startdatestring = startdate.strftime("%Y-%m-%d")
# print "startdatestring->", startdatestring
startdateyearstring = startdatestring[0:4]
startdatemonthstring = startdatestring[5:7]
# print "startdateyearstring->", startdateyearstring
startdateyearint = int(startdateyearstring)
startdatemonthint = int(startdatemonthstring)
# print "startdateyearint is->", startdateyearint
# print "startdateyearinttype", type(startdateyearint)
currYear = startdateyearint
currMonth = startdatemonthint
currperiod = startdateyearstring + startdatemonthstring
if (frequency == 'M'):
return 1
if (frequency == 'S' or frequency == 'A' and begmonth != None):
currMonth = int(begmonth)
print"in if statement", currMonth
# check nextperiod calculation
if (currperiod == testperiod):
return 1
if (currperiod > testperiod):
return 0
if (frequency == 'Q'):
currMonth = currMonth + 3
if (frequency == 'S'):
currMonth = currMonth + 1
if (currMonth > 12):
currMonth = currMonth - 12
currYear = currYear + 1
return 0
и это мой код TimestampConversion для форматирования Unicode как datetime
def StringtoTimestamp(datetext):
if(datetext==None):
return None
else:
datevalue = datetime.datetime.strptime(datetext, "%b %d %Y %H:%M:%S:%f%p")
return datevalue
spark.udf.register('TimestampConvert',lambda datetext:StringtoTimestamp(datetext),TimestampType()
)
spark.udf.register("fnGetChargeInd",lambda x,y,z,timeperiod:filter(x,y,z,timeperiod),IntegerType())
теперь после этого я запросил таблицу расчета chargeAmt
spark.sql("select b.ENTITYID as ENTITYID, cm.BLDGID as BldgID,cm.LEASID as LeaseID,coalesce(l.SUITID,(select EmptyDefault from EmptyDefault)) as SuiteID,(select CurrDate from CurrDate) as TxnDate,cm.INCCAT as IncomeCat,'??' as SourceCode,(Select CurrPeriod from CurrPeriod)as Period,coalesce(case when cm.DEPARTMENT ='@' then 'null' else cm.DEPARTMENT end, null) as Dept,'Lease' as ActualProjected ,fnGetChargeInd(TimestampConvert(cm.EFFDATE),cm.FRQUENCY,cm.BEGMONTH,('select CurrPeriod from CurrPeriod'))*coalesce (cm.AMOUNT,0) as ChargeAmt,0 as OpenAmt,cm.CURRCODE as CurrencyCode,case when ('PERIOD.DATACLSD') is null then 'Open' else 'Closed' end as GLClosedStatus,'Unposted'as GLPostedStatus ,'Unpaid' as PaidStatus,cm.FRQUENCY as Frequency,0 as RetroPD from CMRECC cm join BLDG b on cm.BLDGID =b.BLDGID join LEAS l on cm.BLDGID =l.BLDGID and cm.LEASID =l.LEASID and (l.VACATE is null or l.VACATE >= ('select CurrDate from CurrDate')) and (l.EXPIR >= ('select CurrDate from CurrDate') or l.EXPIR < ('select RunDate from RunDate')) left outer join PERIOD on b.ENTITYID = PERIOD.ENTITYID and ('select CurrPeriod from CurrPeriod')=PERIOD.PERIOD where ('select CurrDate from CurrDate')>=cm.EFFDATE and (select CurrDate from CurrDate) <= coalesce(cm.EFFDATE,cast(date_add(( select min(cm2.EFFDATE) from CMRECC cm2 where cm2.BLDGID = cm.BLDGID and cm2.LEASID = cm.LEASID and cm2.INCCAT = cm.INCCAT and 'cm2.EFFDATE' > 'cm.EFFDATE'),-1) as timestamp) ,case when l.EXPIR <(select RunDate from RunDate)then (Select RunDate from RunDate) else l.EXPIR end)").show()
Он идеально рассчитывает chargeAmt
Я сохранил этот результат во временной таблице Fact_Temp сейчас ВОЗНИКНОВЕНИЕ ПРОБЛЕМЫ. Я хотел запросить отфильтрованную таблицу, где я получу данные после удаления строки, где ActualProjected = Lease и ChargeAmt = 0
spark.sql("select * from Fact_Temp except(select * from Fact_Temp where ActualProjected='Lease' and ChargeAmt='0')").show()
это дает мне исключение
java.lang.UnsupportedOperationException: невозможно оценить выражение: fnGetChargeInd (TimestampConvert (input [0, string, true]), input [1, string, true], input [2, string, true], выберите CurrPeriod из CurrPeriod)
я понял, что chargeAmt не принимает никакого значения, потому что если я делаю запрос без этого условия, он работает хорошо
spark.sql("select * from Fact_Temp except(select * from Fact_Temp where ActualProjected='Lease')").show()
это дает мне ожидаемый EmptyTable. Логически я думаю, что значение chargeAMt установлено в таблице после расчета, и я зарегистрировал эту таблицу, поэтому значение сохраняется. поэтому, когда я запрашиваю сохраненную таблицу. Я понятия не имею, почему здесь вызывается функция. Я уже видел этот пост в stackoverflow UnsupportedOperationException: Невозможно вычислить выражение: .. при добавлении нового столбца withColumn () и udf () для понимания, но хотя мой случай здесь другой. я пробовал printtschema dataframe, я видел только схему этого соблазнительного
Как я могу решить эту проблему, мы высоко ценим любое руководство. я что-то упустил в моем коде. Пожалуйста, помогите мне. Я использую Pyspark 2.0 заранее спасибо Kalyan