Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions flink-python/pyflink/fn_execution/coder_impl_fast.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ cdef class TimestampCoderImpl(FieldCoderImpl):

cdef _decode_timestamp_data_from_stream(self, InputStream in_stream)

cdef _to_utc_timestamp(self, value)

cdef _to_datetime(self, int64_t seconds, int32_t microseconds)

cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
cdef object _timezone

Expand Down
22 changes: 19 additions & 3 deletions flink-python/pyflink/fn_execution/coder_impl_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -685,8 +685,9 @@ cdef class TimestampCoderImpl(FieldCoderImpl):
cpdef encode_to_stream(self, value, OutputStream out_stream):
cdef int32_t microseconds_of_second, nanoseconds
cdef int64_t timestamp_seconds, timestamp_milliseconds
timestamp_seconds = <int64_t> (value.replace(tzinfo=datetime.timezone.utc).timestamp())
microseconds_of_second = value.microsecond
utc_ts = self._to_utc_timestamp(value)
timestamp_seconds = <int64_t> (utc_ts.timestamp())
microseconds_of_second = utc_ts.microsecond
timestamp_milliseconds = timestamp_seconds * 1000 + microseconds_of_second // 1000
nanoseconds = microseconds_of_second % 1000 * 1000
if self._is_compact:
Expand All @@ -709,7 +710,15 @@ cdef class TimestampCoderImpl(FieldCoderImpl):
nanoseconds = in_stream.read_int32()
seconds = milliseconds // 1000
microseconds = milliseconds % 1000 * 1000 + nanoseconds // 1000
return datetime.datetime.utcfromtimestamp(seconds).replace(microsecond=microseconds)
return self._to_datetime(seconds, microseconds)

cdef _to_utc_timestamp(self, value):
return value.replace(tzinfo=datetime.timezone.utc)

cdef _to_datetime(self, int64_t seconds, int32_t microseconds):
datetime.datetime.utcfromtimestamp(seconds).replace(microsecond=microseconds)

cdef _to_datetime(self, int64_t seconds, int32_t microseconds)

cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
"""
Expand All @@ -723,6 +732,13 @@ cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
cpdef decode_from_stream(self, InputStream in_stream, size_t size):
return self._timezone.localize(self._decode_timestamp_data_from_stream(in_stream))

cpdef _to_utc_timestamp(self, value):
return value.astimezone(tzinfo=datetime.timezone.utc)

cdef _to_datetime(self, int64_t seconds, int32_t microseconds):
(datetime.datetime.fromtimestamp(seconds, tz=datetime.timezone.utc)
.replace(microsecond=microseconds).astimezone(self._timezone))

cdef class InstantCoderImpl(FieldCoderImpl):
"""
A coder for Instant.
Expand Down
17 changes: 12 additions & 5 deletions flink-python/pyflink/fn_execution/coder_impl_slow.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,7 @@ def decode_from_stream(self, in_stream: InputStream, length=0):
nanoseconds = in_stream.read_int32()
return self.internal_to_timestamp(milliseconds, nanoseconds)

@staticmethod
def timestamp_to_internal(timestamp):
def timestamp_to_internal(self, timestamp):
seconds = int(timestamp.replace(tzinfo=datetime.timezone.utc).timestamp())
microseconds_of_second = timestamp.microsecond
milliseconds = seconds * 1000 + microseconds_of_second // 1000
Expand All @@ -593,10 +592,18 @@ def __init__(self, precision, timezone):
super(LocalZonedTimestampCoderImpl, self).__init__(precision)
self.timezone = timezone

def timestamp_to_internal(self, timestamp):
seconds = int(timestamp.astimezone(datetime.timezone.utc).timestamp())
microseconds_of_second = timestamp.microsecond
milliseconds = seconds * 1000 + microseconds_of_second // 1000
nanoseconds = microseconds_of_second % 1000 * 1000
return milliseconds, nanoseconds

def internal_to_timestamp(self, milliseconds, nanoseconds):
return self.timezone.localize(
super(LocalZonedTimestampCoderImpl, self).internal_to_timestamp(
milliseconds, nanoseconds))
return (super(LocalZonedTimestampCoderImpl, self).internal_to_timestamp(
milliseconds, nanoseconds)
.replace(tzinfo=datetime.timezone.utc)
.astimezone(self.timezone))


class InstantCoderImpl(FieldCoderImpl):
Expand Down
16 changes: 6 additions & 10 deletions flink-python/pyflink/table/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
_create_type_verifier, UserDefinedType, DataTypes, Row, RowField,
RowType, ArrayType, BigIntType, VarCharType, MapType, DataType,
_from_java_data_type, ZonedTimestampType,
LocalZonedTimestampType, _to_java_data_type)
_to_java_data_type)
from pyflink.testing.test_case_utils import PyFlinkTestCase


Expand Down Expand Up @@ -547,20 +547,16 @@ def test_local_zoned_timestamp_type(self):
last_abbreviation = DataTypes.TIMESTAMP_LTZ()
self.assertEqual(lztst, last_abbreviation)

ts = datetime.datetime(1970, 1, 1, 0, 0, 0, 0000)
ts = datetime.datetime(1970, 1, 1, 0, 0, 0, 0000,
datetime.timezone.utc)
self.assertEqual(0, lztst.to_sql_type(ts))

import pytz
# suppose the timezone of the data is +9:00
timezone = pytz.timezone("Asia/Tokyo")
orig_epoch = LocalZonedTimestampType.EPOCH_ORDINAL
try:
# suppose the local timezone is +8:00
LocalZonedTimestampType.EPOCH_ORDINAL = 28800000000
ts_tokyo = timezone.localize(ts)
self.assertEqual(-3600000000, lztst.to_sql_type(ts_tokyo))
finally:
LocalZonedTimestampType.EPOCH_ORDINAL = orig_epoch
# suppose the local timezone is +8:00
ts_tokyo = ts.astimezone(timezone)
self.assertEqual(0, lztst.to_sql_type(ts_tokyo))

if sys.version_info >= (3, 6):
ts2 = lztst.from_sql_type(0)
Expand Down
27 changes: 19 additions & 8 deletions flink-python/pyflink/table/tests/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,13 @@ def timestamp_func(timestamp_param):
'timestamp_param is wrong value %s !' % timestamp_param
return timestamp_param

@udf(result_type=DataTypes.TIMESTAMP_LTZ(3))
def timestamp_ltz_func(timestamp_ltz_param):
from datetime import datetime, timezone
assert timestamp_ltz_param == datetime(2018, 3, 11, 3, 0, 0, 123000, timezone.utc), \
'timestamp_ltz_param is wrong value %s !' % timestamp_ltz_param
return timestamp_ltz_param

@udf(result_type=DataTypes.ARRAY(DataTypes.BIGINT()))
def array_func(array_param):
assert array_param == [[1, 2, 3]] or array_param == ((1, 2, 3),), \
Expand Down Expand Up @@ -427,7 +434,8 @@ def varchar_func(varchar_param):
q DECIMAL(38, 18),
r BINARY(5),
s CHAR(7),
t VARCHAR(10)
t VARCHAR(10),
u TIMESTAMP_LTZ(3)
) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
Expand All @@ -441,7 +449,8 @@ def varchar_func(varchar_param):
datetime.datetime(2018, 3, 11, 3, 0, 0, 123000), [[1, 2, 3]],
{1: 'flink', 2: 'pyflink'}, decimal.Decimal('1000000000000000000.05'),
decimal.Decimal('1000000000000000000.05999999999999999899999999999'),
bytearray(b'flink'), 'pyflink', 'pyflink')],
bytearray(b'flink'), 'pyflink', 'pyflink',
datetime.datetime(2018, 3, 11, 3, 0, 0, 123000, datetime.timezone.utc))],
DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
Expand All @@ -462,7 +471,8 @@ def varchar_func(varchar_param):
DataTypes.FIELD("q", DataTypes.DECIMAL(38, 18)),
DataTypes.FIELD("r", DataTypes.BINARY(5)),
DataTypes.FIELD("s", DataTypes.CHAR(7)),
DataTypes.FIELD("t", DataTypes.VARCHAR(10))]))
DataTypes.FIELD("t", DataTypes.VARCHAR(10)),
DataTypes.FIELD("u", DataTypes.TIMESTAMP_LTZ(3))]))

t.select(
bigint_func(t.a),
Expand All @@ -484,7 +494,8 @@ def varchar_func(varchar_param):
decimal_cut_func(t.q),
binary_func(t.r),
char_func(t.s),
varchar_func(t.t)) \
varchar_func(t.t),
timestamp_ltz_func(t.u)) \
.execute_insert(sink_table).wait()
actual = source_sink_utils.results()
# Currently the sink result precision of DataTypes.TIME(precision) only supports 0.
Expand All @@ -494,7 +505,7 @@ def varchar_func(varchar_param):
"2018-03-11T03:00:00.123, [1, 2, 3], "
"{1=flink, 2=pyflink}, 1000000000000000000.050000000000000000, "
"1000000000000000000.059999999999999999, [102, 108, 105, 110, 107], "
"pyflink, pyflink]"])
"pyflink, pyflink, 2018-03-11T03:00:00.123Z]"])

def test_all_data_types(self):
def boolean_func(bool_param):
Expand Down Expand Up @@ -971,9 +982,7 @@ def eval(self, col):
"non-callable-udf", udf(Plus(), DataTypes.BIGINT(), DataTypes.BIGINT()))

def test_data_types(self):
timezone = self.t_env.get_config().get_local_timezone()
local_datetime = pytz.timezone(timezone).localize(
datetime.datetime(1970, 1, 1, 0, 0, 0, 123000))
local_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 123000, datetime.timezone.utc)

@udf(result_type=DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
def local_zoned_timestamp_func(local_zoned_timestamp_param):
Expand Down Expand Up @@ -1161,6 +1170,8 @@ def echo(i: str):
if __name__ == '__main__':
import unittest

os.environ['_python_worker_execution_mode'] = "loopback"

try:
import xmlrunner

Expand Down
16 changes: 11 additions & 5 deletions flink-python/pyflink/table/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from pyflink.common.types import _create_row
from pyflink.util.api_stability_decorators import PublicEvolving
from pyflink.util.exceptions import TableException
from pyflink.util.java_utils import to_jarray, is_instance_of
from pyflink.java_gateway import get_gateway
from pyflink.common import Row, RowKind
Expand Down Expand Up @@ -498,14 +499,19 @@ def need_conversion(self):

def to_sql_type(self, dt):
if dt is not None:
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
return int(seconds) * 10 ** 6 + dt.microsecond + self.EPOCH_ORDINAL
if dt.tzinfo is None:
raise TableException(
f"""The input field {dt} does not specify time zone but its SQL type \
TIMESTAMP_LTZ requires TIME ZONE. Please use TIMESTAMP instead or use CAST \
function to cast TIMESTAMP as TIMESTAMP_LTZ."""
)
seconds = calendar.timegm(dt.utctimetuple())
return int(seconds) * 10 ** 6 + dt.microsecond

def from_sql_type(self, ts):
if ts is not None:
ts = ts - self.EPOCH_ORDINAL
return datetime.datetime.fromtimestamp(ts // 10 ** 6).replace(microsecond=ts % 10 ** 6)
return (datetime.datetime.fromtimestamp(ts // 10 ** 6, datetime.timezone.utc)
.replace(microsecond=ts % 10 ** 6))


class ZonedTimestampType(AtomicType):
Expand Down
1 change: 1 addition & 0 deletions flink-python/pyflink/testing/test_case_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def setUpClass(cls):
super(PyFlinkStreamTableTestCase, cls).setUpClass()
cls.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
cls.env.set_parallelism(2)
os.environ['_python_worker_execution_mode'] = "loopback"
cls.t_env = StreamTableEnvironment.create(cls.env)
cls.t_env.get_config().set("python.fn-execution.bundle.size", "1")

Expand Down