#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (C) 2012-17 Dr. Ralf Schlatterbeck Open Source Consulting.
# Reichergasse 131, A-3411 Weidling.
# Web: http://www.runtux.com Email: office@runtux.com
# All rights reserved
# ****************************************************************************
# This library is free software; you can redistribute it and/or modify
# it under the terms of the GNU Library General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU Library General Public
# License along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
# ****************************************************************************
from __future__ import print_function
from __future__ import unicode_literals
import sys
import re
import csv
from datetime import datetime, time, tzinfo, timedelta
from rsclib.stateparser import Parser, Parse_Error
from rsclib.autosuper import autosuper
from rsclib.pycompat import ustr
class TZ (tzinfo) :
def __init__ (self, offset = 0) :
self.offset = int (offset, 10)
# end def __init__
def utcoffset (self, dt = None) :
return timedelta (hours = self.offset)
# end def utcoffset
def dst (self, dt = None) :
return timedelta (0)
# end def dst
def tzname (self, dt = None) :
off = self.utcoffset ().seconds
assert off % 3600 == 0
off //= 3600
sign = b'+'
if off < 0 :
sign = b'-'
return b'%s%02d' % (sign, off)
# end def tzname
def __str__ (self) :
return "TZ (%s)" % self.offset
# end def __str__
__repr__ = __str__
# end class TZ
sql_bool = {b't' : True, b'f' : False}
class SQL_Dialect_Postgres (autosuper) :
""" SQL Dialect for Postgres """
sql_null = b'\\N'
# end class SQL_Dialect_Postgres
class SQL_Dialect_Mysql (autosuper) :
""" SQL Dialect for Postgres """
sql_null = b'NULL'
# end class SQL_Dialect_Mysql
dialect_pg = SQL_Dialect_Postgres ()
dialect_my = SQL_Dialect_Mysql
class SQL_Type (autosuper) :
def __init__ (self, *p) :
self.parameters = p
# end def __init__
def format (self, dialect, typ, value) :
if value is None :
return dialect.sql_null
# special case for date/time formats
# We remove trailing 0s in the microsecond part and remove the
# decimal dot if the microsecond part was all zero to make it
# roundtrip
if getattr (self, 'dtfmt', None) :
return value.strftime (self.dtfmt)
if getattr (self, 'timefmt', None) :
return value.strftime (self.timefmt).rstrip (b'0').rstrip (b'.')
return repr (value)
# end def format
def typ (self, tn) :
if self.parameters :
return "%s(%s)" % (tn, ','.join (self.parameters))
return tn
# end def typ
# end class SQL_Type
class SQL_boolean (SQL_Type) :
""" Parse boolean from sql dump and return as python bool.
>>> b = SQL_boolean ()
>>> b (b'\\N')
>>> b (b'NULL')
>>> b (b'f')
False
>>> b (b't')
True
"""
def __call__ (self, b) :
if b == b'\\N' or b == b'NULL' :
return None
return sql_bool [b]
# end def __call__
def format (self, dialect, typ, value) :
if value is None :
return dialect.sql_null
if value :
return b't'
return b'f'
# end def format
# end class SQL_boolean
class SQL_double (SQL_Type) :
def __call__ (self, f) :
if f == '\\N' or f == 'NULL' :
return None
return float (f)
# end def __call__
# end class SQL_double
class SQL_integer (SQL_Type) :
def __call__ (self, i) :
if i == '\\N' or i == 'NULL' :
return None
return int (i)
# end def __call__
# end class SQL_integer
SQL_bigint = SQL_smallint = SQL_integer
class SQL_numeric (SQL_Type) :
def __init__ (self, integer_part_len, fractional_part_len) :
self.integer_part_len = int (integer_part_len)
self.fractional_part_len = int (fractional_part_len)
# end def __init__
def __call__ (self, n) :
if n == '\\N' or n == 'NULL' :
return None
a, b = n.split ('.')
return (int (a), int (b))
# end def __call__
def format (self, dialect, typ, value) :
fmt = b"%%d.%%0%dd" % self.fractional_part_len
if value is None :
return dialect.sql_null
return fmt % value
# end def format
def typ (self, tn) :
il = self.integer_part_len
fl = self.fractional_part_len
return "%s(%s,%s)" % (tn, il, fl)
# end def typ
# end class SQL_numeric
# For regression testing, doesn't work with doctest because doctest
# needs backslashes in strings escaped and accented characters are
# output as backslash-escapes in python2 and as rendered strings in
# python3 by default.
broken_strings_latin1 = \
( (b'\xd6ffnungswinkel', '\xd6ffnungswinkel')
,
)
broken_strings_utf8 = \
( (b'\xc3\x96ffnungswinkel', '\xd6ffnungswinkel')
,
)
broken_strings_utf8_double = \
( ( b'\xc3\x83\xc2\xa4\xc3\x83\xc2\xb6\xc3\x83\xc2\xbc\xc3\x83'
b'\xc2\x84\xc3\x83\xc2\x96\xc3\x83\xc2\x9c\xc3\x83\xc2\x9f'
, '\xe4\xf6\xfc\xc4\xd6\xdc\xdf'
)
, ( b'Conrad von H\xc3\x83\xc2\xb6tzendorf Stra\xc3\x83\xc5\xb8e'
, 'Conrad von H\xf6tzendorf Stra\xdfe'
)
, ( b'Josefst\xc4\x82\xc2\xa4dter Stra\xc4\x82\xc5\xbae'
, 'Josefst\xe4dter Stra\xdfe'
)
, ( b'Josefst\xc4\x82\xc2\xa4dter Stra\xc3\x83\xc2\x9fe'
, 'Josefst\xe4dter Stra\xdfe'
)
, ( b'Sch\xc4\x82\xc2\xb6nburgstra\xc4\x82\xc5\xbae'
, 'Sch\xf6nburgstra\xdfe'
)
, ( b'Wei\xc3\x83\xc2\x9fgerberl\xc4\x82\xc2\xa4nde'
, 'Wei\xdfgerberl\xe4nde'
)
, ( b'M\xc4\x82\xc4\xbdller'
, 'M\xfcller'
)
, ( b'\xc4\x82\xe2\x80\x93'
, '\xd6'
)
, ( b'M\xc4\x8f\xc5\xbc\xcb\x9dller'
, 'M\xfcller'
)
, ( b'M\xc4\x8f\xc5\xbc\xcb\x9dller'
, 'M\xfcller'
)
, ( b'Thaliastra\xc4\x8f\xc5\xbc\xcb\x9de'
, 'Thaliastra\xdfe'
)
, ( b'F\xc4\x8f\xc5\xbc\xcb\x9dnfhaus'
, 'F\xfcnfhaus'
)
, ( b'Putzingerstra\xc4\x8f\xc5\xbc\xcb\x9de'
, 'Putzingerstra\xdfe'
)
, ( b'H\xc4\x8f\xc5\xbc\xcb\x9dtteldorf'
, 'H\xfctteldorf'
)
, ( b'Hollandstra\xc4\x8f\xc5\xbc\xcb\x9de'
, 'Hollandstra\xdfe'
)
, ( b'Margareteng\xc4\x8f\xc5\xbc\xcb\x9drtel'
, 'Margareteng\xfcrtel'
)
)
class SQL_character (SQL_Type) :
""" Get string from sql dump and convert to unicode.
>>> sq = SQL_character ()
>>> for k, v in broken_strings_utf8 :
... if sq (k) != v :
... print (repr (sq (k)), repr (v))
>>> sq.charset = 'latin1'
>>> for k, v in broken_strings_latin1 :
... if sq (k) != v :
... print (repr (sq (k)), repr (v))
>>> sq.charset = 'utf-8'
>>> sq.fix_double_encode = True
>>> for k, v in broken_strings_utf8_double :
... if sq (k) != v :
... print (repr (sq (k)), repr (v))
"""
charset = 'utf-8'
fix_double_encode = False # enabling this makes sense only for utf-8
re_double = re.compile (r'\xc3\x83|\x82\xc2|\xc5|\xc4\x82')
def __call__ (self, s) :
if s == b'\\N' or s == b'NULL' :
return None
if self.charset == 'utf-8' and self.fix_double_encode :
# Don't know ho