This commit is contained in:
2025-05-22 20:25:38 +02:00
parent 09f6750c2b
commit ce03fbf12f
529 changed files with 3353 additions and 3312 deletions

View File

@@ -18,7 +18,6 @@ from ... import literal_column
from ... import Numeric
from ... import select
from ... import String
from ...dialects.postgresql import BYTEA
from ...types import LargeBinary
from ...types import UUID
from ...types import Uuid
@@ -104,6 +103,15 @@ class InsertBehaviorTest(fixtures.TablesTest):
Column("id", Integer, primary_key=True, autoincrement=False),
Column("data", String(50)),
)
Table(
"no_implicit_returning",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("data", String(50)),
implicit_returning=False,
)
Table(
"includes_defaults",
metadata,
@@ -119,6 +127,33 @@ class InsertBehaviorTest(fixtures.TablesTest):
),
)
@testing.variation("style", ["plain", "return_defaults"])
@testing.variation("executemany", [True, False])
def test_no_results_for_non_returning_insert(
self, connection, style, executemany
):
"""test another INSERT issue found during #10453"""
table = self.tables.no_implicit_returning
stmt = table.insert()
if style.return_defaults:
stmt = stmt.return_defaults()
if executemany:
data = [
{"data": "d1"},
{"data": "d2"},
{"data": "d3"},
{"data": "d4"},
{"data": "d5"},
]
else:
data = {"data": "d1"}
r = connection.execute(stmt, data)
assert not r.returns_rows
@requirements.autoincrement_insert
def test_autoclose_on_insert(self, connection):
r = connection.execute(
@@ -394,7 +429,7 @@ class ReturningTest(fixtures.TablesTest):
True,
testing.requires.float_or_double_precision_behaves_generically,
),
(Float(), 8.5514, False),
(Float(), 8.5514, True),
(
Float(8),
8.5514,
@@ -517,7 +552,6 @@ class ReturningTest(fixtures.TablesTest):
b"this is binary",
),
("LargeBinary2", LargeBinary(), b"7\xe7\x9f"),
("PG BYTEA", BYTEA(), b"7\xe7\x9f", testing.only_on("postgresql")),
argnames="type_,value",
id_="iaa",
)

View File

@@ -287,6 +287,65 @@ class HasIndexTest(fixtures.TablesTest):
)
class BizarroCharacterFKResolutionTest(fixtures.TestBase):
"""tests for #10275"""
__backend__ = True
@testing.combinations(
("id",), ("(3)",), ("col%p",), ("[brack]",), argnames="columnname"
)
@testing.variation("use_composite", [True, False])
@testing.combinations(
("plain",),
("(2)",),
("per % cent",),
("[brackets]",),
argnames="tablename",
)
def test_fk_ref(
self, connection, metadata, use_composite, tablename, columnname
):
tt = Table(
tablename,
metadata,
Column(columnname, Integer, key="id", primary_key=True),
test_needs_fk=True,
)
if use_composite:
tt.append_column(Column("id2", Integer, primary_key=True))
if use_composite:
Table(
"other",
metadata,
Column("id", Integer, primary_key=True),
Column("ref", Integer),
Column("ref2", Integer),
sa.ForeignKeyConstraint(["ref", "ref2"], [tt.c.id, tt.c.id2]),
test_needs_fk=True,
)
else:
Table(
"other",
metadata,
Column("id", Integer, primary_key=True),
Column("ref", ForeignKey(tt.c.id)),
test_needs_fk=True,
)
metadata.create_all(connection)
m2 = MetaData()
o2 = Table("other", m2, autoload_with=connection)
t1 = m2.tables[tablename]
assert o2.c.ref.references(t1.c[0])
if use_composite:
assert o2.c.ref2.references(t1.c[1])
class QuotedNameArgumentTest(fixtures.TablesTest):
run_create_tables = "once"
__backend__ = True
@@ -3053,6 +3112,7 @@ __all__ = (
"ComponentReflectionTestExtra",
"TableNoColumnsTest",
"QuotedNameArgumentTest",
"BizarroCharacterFKResolutionTest",
"HasTableTest",
"HasIndexTest",
"NormalizedNameTest",

View File

@@ -254,7 +254,7 @@ class ServerSideCursorsTest(
elif self.engine.dialect.driver == "pymysql":
sscursor = __import__("pymysql.cursors").cursors.SSCursor
return isinstance(cursor, sscursor)
elif self.engine.dialect.driver in ("aiomysql", "asyncmy"):
elif self.engine.dialect.driver in ("aiomysql", "asyncmy", "aioodbc"):
return cursor.server_side
elif self.engine.dialect.driver == "mysqldb":
sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
@@ -311,7 +311,7 @@ class ServerSideCursorsTest(
True,
"SELECT 1 FOR UPDATE",
True,
testing.skip_if("sqlite"),
testing.skip_if(["sqlite", "mssql"]),
),
("text_no_ss", False, text("select 42"), False),
(

View File

@@ -66,6 +66,49 @@ class RowCountTest(fixtures.TablesTest):
eq_(rows, self.data)
@testing.variation("statement", ["update", "delete", "insert", "select"])
@testing.variation("close_first", [True, False])
def test_non_rowcount_scenarios_no_raise(
self, connection, statement, close_first
):
employees_table = self.tables.employees
# WHERE matches 3, 3 rows changed
department = employees_table.c.department
if statement.update:
r = connection.execute(
employees_table.update().where(department == "C"),
{"department": "Z"},
)
elif statement.delete:
r = connection.execute(
employees_table.delete().where(department == "C"),
{"department": "Z"},
)
elif statement.insert:
r = connection.execute(
employees_table.insert(),
[
{"employee_id": 25, "name": "none 1", "department": "X"},
{"employee_id": 26, "name": "none 2", "department": "Z"},
{"employee_id": 27, "name": "none 3", "department": "Z"},
],
)
elif statement.select:
s = select(
employees_table.c.name, employees_table.c.department
).where(employees_table.c.department == "C")
r = connection.execute(s)
r.all()
else:
statement.fail()
if close_first:
r.close()
assert r.rowcount in (-1, 3)
def test_update_rowcount1(self, connection):
employees_table = self.tables.employees

View File

@@ -28,6 +28,7 @@ from ... import Date
from ... import DateTime
from ... import Float
from ... import Integer
from ... import Interval
from ... import JSON
from ... import literal
from ... import literal_column
@@ -82,6 +83,11 @@ class _LiteralRoundTripFixture:
)
connection.execute(ins)
ins = t.insert().values(
x=literal(None, type_, literal_execute=True)
)
connection.execute(ins)
if support_whereclause and self.supports_whereclause:
if compare:
stmt = t.select().where(
@@ -108,7 +114,7 @@ class _LiteralRoundTripFixture:
)
)
else:
stmt = t.select()
stmt = t.select().where(t.c.x.is_not(None))
rows = connection.execute(stmt).all()
assert rows, "No rows returned"
@@ -118,6 +124,10 @@ class _LiteralRoundTripFixture:
value = filter_(value)
assert value in output
stmt = t.select().where(t.c.x.is_(None))
rows = connection.execute(stmt).all()
eq_(rows, [(None,)])
return run
@@ -452,6 +462,102 @@ class StringTest(_LiteralRoundTripFixture, fixtures.TestBase):
)
class IntervalTest(_LiteralRoundTripFixture, fixtures.TestBase):
__requires__ = ("datetime_interval",)
__backend__ = True
datatype = Interval
data = datetime.timedelta(days=1, seconds=4)
def test_literal(self, literal_round_trip):
literal_round_trip(self.datatype, [self.data], [self.data])
def test_select_direct_literal_interval(self, connection):
row = connection.execute(select(literal(self.data))).first()
eq_(row, (self.data,))
def test_arithmetic_operation_literal_interval(self, connection):
now = datetime.datetime.now().replace(microsecond=0)
# Able to subtract
row = connection.execute(
select(literal(now) - literal(self.data))
).scalar()
eq_(row, now - self.data)
# Able to Add
row = connection.execute(
select(literal(now) + literal(self.data))
).scalar()
eq_(row, now + self.data)
@testing.fixture
def arithmetic_table_fixture(cls, metadata, connection):
class Decorated(TypeDecorator):
impl = cls.datatype
cache_ok = True
it = Table(
"interval_table",
metadata,
Column(
"id", Integer, primary_key=True, test_needs_autoincrement=True
),
Column("interval_data", cls.datatype),
Column("date_data", DateTime),
Column("decorated_interval_data", Decorated),
)
it.create(connection)
return it
def test_arithmetic_operation_table_interval_and_literal_interval(
self, connection, arithmetic_table_fixture
):
interval_table = arithmetic_table_fixture
data = datetime.timedelta(days=2, seconds=5)
connection.execute(
interval_table.insert(), {"id": 1, "interval_data": data}
)
# Subtraction Operation
value = connection.execute(
select(interval_table.c.interval_data - literal(self.data))
).scalar()
eq_(value, data - self.data)
# Addition Operation
value = connection.execute(
select(interval_table.c.interval_data + literal(self.data))
).scalar()
eq_(value, data + self.data)
def test_arithmetic_operation_table_date_and_literal_interval(
self, connection, arithmetic_table_fixture
):
interval_table = arithmetic_table_fixture
now = datetime.datetime.now().replace(microsecond=0)
connection.execute(
interval_table.insert(), {"id": 1, "date_data": now}
)
# Subtraction Operation
value = connection.execute(
select(interval_table.c.date_data - literal(self.data))
).scalar()
eq_(value, (now - self.data))
# Addition Operation
value = connection.execute(
select(interval_table.c.date_data + literal(self.data))
).scalar()
eq_(value, (now + self.data))
class PrecisionIntervalTest(IntervalTest):
__requires__ = ("datetime_interval",)
__backend__ = True
datatype = Interval(day_precision=9, second_precision=9)
data = datetime.timedelta(days=103, seconds=4)
class _DateFixture(_LiteralRoundTripFixture, fixtures.TestBase):
compare = None
@@ -1940,6 +2046,8 @@ __all__ = (
"TextTest",
"NumericTest",
"IntegerTest",
"IntervalTest",
"PrecisionIntervalTest",
"CastTypeDecoratorTest",
"DateTimeHistoricTest",
"DateTimeCoercedToDateTimeTest",

View File

@@ -6,6 +6,7 @@ from ..schema import Column
from ..schema import Table
from ... import Integer
from ... import String
from ... import testing
class SimpleUpdateDeleteTest(fixtures.TablesTest):
@@ -58,5 +59,71 @@ class SimpleUpdateDeleteTest(fixtures.TablesTest):
[(1, "d1"), (3, "d3")],
)
@testing.variation("criteria", ["rows", "norows", "emptyin"])
@testing.requires.update_returning
def test_update_returning(self, connection, criteria):
t = self.tables.plain_pk
stmt = t.update().returning(t.c.id, t.c.data)
if criteria.norows:
stmt = stmt.where(t.c.id == 10)
elif criteria.rows:
stmt = stmt.where(t.c.id == 2)
elif criteria.emptyin:
stmt = stmt.where(t.c.id.in_([]))
else:
criteria.fail()
r = connection.execute(stmt, dict(data="d2_new"))
assert not r.is_insert
assert r.returns_rows
eq_(r.keys(), ["id", "data"])
if criteria.rows:
eq_(r.all(), [(2, "d2_new")])
else:
eq_(r.all(), [])
eq_(
connection.execute(t.select().order_by(t.c.id)).fetchall(),
[(1, "d1"), (2, "d2_new"), (3, "d3")]
if criteria.rows
else [(1, "d1"), (2, "d2"), (3, "d3")],
)
@testing.variation("criteria", ["rows", "norows", "emptyin"])
@testing.requires.delete_returning
def test_delete_returning(self, connection, criteria):
t = self.tables.plain_pk
stmt = t.delete().returning(t.c.id, t.c.data)
if criteria.norows:
stmt = stmt.where(t.c.id == 10)
elif criteria.rows:
stmt = stmt.where(t.c.id == 2)
elif criteria.emptyin:
stmt = stmt.where(t.c.id.in_([]))
else:
criteria.fail()
r = connection.execute(stmt)
assert not r.is_insert
assert r.returns_rows
eq_(r.keys(), ["id", "data"])
if criteria.rows:
eq_(r.all(), [(2, "d2")])
else:
eq_(r.all(), [])
eq_(
connection.execute(t.select().order_by(t.c.id)).fetchall(),
[(1, "d1"), (3, "d3")]
if criteria.rows
else [(1, "d1"), (2, "d2"), (3, "d3")],
)
__all__ = ("SimpleUpdateDeleteTest",)