вставить дополнительные строки в результат запроса sql

Учитывая таблицу с записями с нерегулярными отметками времени, разрывы должны быть вставлены через регулярные 5-минутные интервалы (связанные данные могут/будут иметь значение NULL).

Я думал о том, чтобы получить время начала, сделать подзапрос с оконной функцией и добавить 5-минутные интервалы к времени начала, но я мог думать только об использовании row_number для увеличения значений.

WITH data as(
select id, data,
cast(date_and_time as double) * 1000 as time_milliseconds
from t1), -- original data

start_times as(
select id, MIN(CAST(date_and_time as double) * 1000) as start_time
from t1
GROUP BY id
), -- first timestamp for each id

boundries as (
SELECT T1.id,(row_number() OVER (PARTITION BY T1.id ORDER BY T1.date_and_time)-1) *300000 + start_times.start_time
as boundry
from T1
INNER JOIN start_times ON start_times.id= T1.id
) -- increment the number of 5 min added on each row and later full join boundries table with original data

Однако это ограничивает меня количеством строк, присутствующих для идентификатора в исходной таблице данных, и если временные метки разбросаны, количество строк не может покрыть количество 5-минутных интервалов, которые необходимо добавить.

образец данных:

initial data:

 |-----------|------------------|------------------|
 |   id      |     value        |    timestamp     |
 |-----------|------------------|------------------|
 |     1     |    3             |    12:00:01.011  | 
 |-----------|------------------|------------------|
 |     1     |    4             |    12:03:30.041  |
 |-----------|------------------|------------------|
 |     1     |    5             |    12:12:20.231  |
 |-----------|------------------|------------------|
 |     1     |    3             |    15:00:00.312  |

data after my query:

 |-----------|------------------|------------------|
 |   id      |     value        | timestamp (UNIX) |
 |-----------|------------------|------------------|
 |     1     |    3             |    12:00:01      | 
 |-----------|------------------|------------------|
 |     1     |    4             |    12:03:30      |
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:05:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:10:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    5             |    12:12:20      |
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:15:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:20:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|  <-- Jumping directly to 15:00:00 (WRONG! :( need to insert more 5 min breaks here )
 |     1     |    3             |    15:00:00      |  



Я думал о создании временной таблицы внутри HIVE и заполнении ее строками x, представляющими 5-минутные интервалы от времени начала до времени окончания таблицы данных, но я не мог найти никакого способа сделать это.

Любой способ использования для циклов? Мы ценим любые предложения.

Спасибо


person habarnam    schedule 16.02.2021    source источник
comment
пожалуйста, предоставьте пример данных и результат   -  person leftjoin    schedule 16.02.2021
comment
спасибо за комментарий, я добавил пример, надеюсь здесь будет понятнее   -  person habarnam    schedule 16.02.2021
comment
Является ли это реальным форматом метки времени, как в столбце date_and_time? пожалуйста, облегчите воспроизведение   -  person leftjoin    schedule 16.02.2021
comment
ага, но с миллисекундами 12:00:01.321 и т.д. надо было их добавить, чтобы было понятно зачем умножение   -  person habarnam    schedule 16.02.2021
comment
И это также должно быть частью даты, верно?   -  person leftjoin    schedule 16.02.2021
comment
Итак, логика такова: если между предыдущей временной меткой и текущей временной меткой более 5 минут, то должна быть вставлена ​​запись с 5 минутами, верно? Или это просто всегда должны быть записи с 05, 10, 15 ? Почему между 12:12:20 и 15:00:00 вы вставили только две записи?   -  person leftjoin    schedule 16.02.2021
comment
Мне действительно не нужно определять текущую и предыдущую разницу: если я могу просто разделить время начала и время окончания исходной таблицы на 5-минутные интервалы, а затем добавить их в свой запрос, этого достаточно   -  person habarnam    schedule 16.02.2021
comment
Давайте продолжим обсуждение в чате.   -  person habarnam    schedule 16.02.2021
comment
@habarnam Я добавил предложение и пример использования UDF для создания вашей временной таблицы и средства для репликации. Дайте мне знать, если это работает для вас   -  person ggordon    schedule 16.02.2021


Ответы (2)


Вы можете попробовать вычислить разницу между текущей временной меткой и следующей, разделить 300, чтобы получить количество диапазонов, создать строку пробелов с длиной = num_ranges, взорвать для создания строк.

Демо:

with your_table as (--initial data example
select stack (3,
1,3 ,'2020-01-01 12:00:01.011', 
1,4 ,'2020-01-01 12:03:30.041',
1,5 ,'2020-01-01 12:20:20.231' 
) as (id ,value ,ts )
)


select id ,value, ts, next_ts,
        diff_sec,num_intervals,
       from_unixtime(unix_timestamp(ts)+h.i*300) new_ts, coalesce(from_unixtime(unix_timestamp(ts)+h.i*300),ts) as calculated_timestamp
from
(
 select id ,value ,ts, next_ts, (unix_timestamp(next_ts)-unix_timestamp(ts))  diff_sec,  
 floor((unix_timestamp(next_ts)-unix_timestamp(ts))/300 --diff in seconds/5 min
                                         ) num_intervals
from
(  
select id ,value ,ts, lead(ts) over(order by ts) next_ts
  from your_table
) s
)s
  lateral view outer posexplode(split(space(cast(s.num_intervals as int)),' ')) h as i,x --this will generate rows

Результат:

id  value   ts                      next_ts                 diff_sec    num_intervals   new_ts              calculated_timestamp
1   3       2020-01-01 12:00:01.011 2020-01-01 12:03:30.041 209          0              2020-01-01 12:00:01 2020-01-01 12:00:01
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:03:30 2020-01-01 12:03:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:08:30 2020-01-01 12:08:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:13:30 2020-01-01 12:13:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:18:30 2020-01-01 12:18:30
1   5       2020-01-01 12:20:20.231 \N                      \N           \N             \N                  2020-01-01 12:20:20.231

Добавлены дополнительные строки. Все промежуточные столбцы я оставил для отладки.

person leftjoin    schedule 16.02.2021
comment
Документы @habarnam: cwiki.apache .org/confluence/display/Hive/ и cwiki.apache. org/confluence/display/Hive/ - person leftjoin; 17.02.2021

Здесь может помочь рекурсивный запрос, но hive не поддерживает эту дополнительную информацию.

Вы можете рассмотреть возможность создания таблицы вне улья или написания пользовательской функции.

В любом случае этот запрос может быть дорогостоящим, и в зависимости от вашей частоты рекомендуется использовать материализованные представления/таблицы.

В примере показана пользовательская функция inbetween, созданная с использованием pyspark для выполнения запроса. Это

  1. генерировать значения между минимальной и максимальной отметкой времени из набора данных
  2. используя CTEs и UDF для создания временной таблицы intervals
  3. создание всех возможных интервалов с использованием дорогостоящего перекрестного соединения в possible_records
  4. Использование левого соединения для извлечения записей с фактическими значениями (для демонстрации я представил значение метки времени как просто строку времени)

В приведенном ниже коде показано, как он оценивался с помощью куста.

Пример кода


from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,ArrayType

inbetween = lambda min_value,max_value :  [*range(min_value,max_value,5*60)]

udf_inbetween = udf(inbetween,ArrayType(IntegerType()))
sqlContext.udf.register("inbetween",udf_inbetween)

sqlContext.sql("""
WITH max_timestamp(t) as (
  select max(timestamp) as t from initial_data2
),
min_timestamp(t) as (
  select min(timestamp) as t from initial_data2
),
intervals as (
   select explode(inbetween(unix_timestamp(mint.t),unix_timestamp(maxt.t))) as interval_time FROM
   min_timestamp mint, max_timestamp maxt
),
unique_ids as (
  select distinct id from initial_data2
),
interval_times as (
   select interval_time from (
   select 
       cast(from_unixtime(interval_time) as timestamp) as interval_time 
   from 
       intervals
   UNION
   select distinct d.timestamp as interval_time from initial_data2 d
   )
   order by interval_time asc
),
possible_records as (
   select
      distinct 
      d.id,
      i.interval_time
   FROM
      interval_times i, unique_ids d
   
)
select 
    p.id,
    d.value,
    split(cast(p.interval_time as string)," ")[1] as timestamp
FROM
  possible_records p
LEFT JOIN
   initial_data2 d ON d.id = p.id and d.timestamp = p.interval_time

ORDER BY p.id, p.interval_time
""").show(20)


Выход

+---+-----+---------+
| id|value|timestamp|
+---+-----+---------+
|  1|    3| 12:00:01|
|  1|    4| 12:03:30|
|  1| null| 12:05:01|
|  1| null| 12:10:01|
|  1|    5| 12:12:20|
|  1| null| 12:15:01|
|  1| null| 12:20:01|
|  1| null| 12:25:01|
|  1| null| 12:30:01|
|  1| null| 12:35:01|
|  1| null| 12:40:01|
|  1| null| 12:45:01|
|  1| null| 12:50:01|
|  1| null| 12:55:01|
|  1| null| 13:00:01|
|  1| null| 13:05:01|
|  1| null| 13:10:01|
|  1| null| 13:15:01|
|  1| null| 13:20:01|
|  1| null| 13:25:01|
+---+-----+---------+

показаны только первые 20 строк

Подготовка данных для репликации


raw_data1 = [
    {"id":1,"value":3,"timestam":"12:00:01"},
    {"id":1,"value":4,"timestam":"12:03:30"},
    {"id":1,"value":5,"timestam":"12:12:20"},
    {"id":1,"value":3,"timestam":"15:00:00"},
]
raw_data = [*map(lambda entry : Row(**entry),raw_data1)]

initial_data = sqlContext.createDataFrame(raw_data,schema="id int, value int, timestam string ")
initial_data.createOrReplaceTempView('initial_data')

sqlContext.sql("create or replace temp view initial_data2 as select id,value,cast(timestam as timestamp) as timestamp from initial_data")

person ggordon    schedule 16.02.2021
comment
@ggordon, не могли бы вы поделиться номером своей версии искры? - person habarnam; 18.02.2021
comment
@habarnam Это было сделано с использованием версии 3.0.1. - person ggordon; 18.02.2021