Немного поясню свой вопрос:
Вот мой код
def averagePerAttributeFlow[T] = Flow[T]
.groupBy(10000, {
case User(id, location, age) =>
location match {
case Location(city,state,country) =>
println("MATCHING BY CITY")
city+state+country
case Location("n/a",state,country) =>
println("MATCHING BY STATE")
state+country
}
case UserBookRating(userId, bookISBN, rating) => userId
})
У меня есть данные, поступающие из файла csv. Формат некоторых строк такой:
"Чикаго, Иллинойс, США"
Однако некоторые строки выглядят так:
", Иллинойс, США"
В некоторых строках даже есть данные о мусоре для городов:
"###, Иллинойс, США"
Как я могу сгруппировать по всем строкам, содержащим illinois, usa, и не обращать внимания на то, указан ли город или нет?
Когда пользователь вводит «Чикаго, Иллинойс, США», группировка работает, но когда пользователь вводит «н/д, Иллинойс, США», группировка не работает. n/a означает, что пользователю не важен город
РЕДАКТИРОВАТЬ:
Итак, я понял, что это может быть мой поток фильтрации, который работает не так, как я ожидал. Вот код:
def filterByAttributeFlow[T](attr: String) = Flow[T].filter( {
case user: User =>
val attrSplit = attr.split(",").map(_.trim())
attrSplit match {
case Array(city,state,country) =>
user.location.isCity(city, state, country) // check if location matches
case Array(_,state,country) =>
println("Filtering by State")
user.location.isState(state,country)
}
case bookRating: UserBookRating =>
bookRating.userId.contentEquals(attr) // check if userId matches
}).withAttributes(ActorAttributes.supervisionStrategy {
e: Throwable =>
system.log.error(s"Error filtering ratings by '$attr': {}", e)
Supervision.Resume // skips the erroneous data and resumes the stream
})
Как я могу отфильтровать и сохранить все записи данных, которые содержат illinois, usa, даже если город не указан или заполнен ненужными данными?