Skip to content

Commit bb68480

Browse files
committed
python: add comprehensive checkpoint test
We didn't have reasonable coverage for checkpoints making sure that we don't break it in a backward incompatible way. This test attempts to fix that. Signed-off-by: Gerd Zellweger <[email protected]>
1 parent 7fda869 commit bb68480

3 files changed

Lines changed: 594 additions & 31 deletions

File tree

python/tests/platform/test_checkpoint_sync.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import os
21
import random
32
import sys
43
import time
@@ -14,16 +13,15 @@
1413
)
1514
from tests import enterprise_only
1615
from tests.shared_test_pipeline import SharedTestPipeline
16+
from tests.utils import (
17+
MINIO_BUCKET,
18+
MINIO_ENDPOINT,
19+
MINIO_REGION,
20+
required_env,
21+
)
1722

1823
from .helper import wait_for_condition
1924

20-
DEFAULT_ENDPOINT = os.environ.get(
21-
"DEFAULT_MINIO_ENDPOINT", "http://minio.extra.svc.cluster.local:9000"
22-
)
23-
DEFAULT_BUCKET = os.environ.get("DEFAULT_MINIO_BUCKET", "default")
24-
ACCESS_KEY = os.environ.get("MINIO_ACCESS_KEY", "minio")
25-
SECRET_KEY = os.environ.get("MINIO_SECRET_KEY", "miniopasswd")
26-
2725

2826
def storage_cfg(
2927
pipeline_name: str,
@@ -38,12 +36,18 @@ def storage_cfg(
3836
retention_min_age: int = 0,
3937
read_bucket: Optional[str] = None,
4038
) -> dict:
39+
# MinIO credentials are read here (not at import time) so collection
40+
# does not blow up in environments where they are unset.
41+
access_key = required_env("CI_K8S_MINIO_ACCESS_KEY_ID")
42+
secret_key = required_env("CI_K8S_MINIO_SECRET_ACCESS_KEY")
43+
4144
sync: dict = {
42-
"bucket": f"{DEFAULT_BUCKET}/{pipeline_name}",
43-
"access_key": ACCESS_KEY,
44-
"secret_key": SECRET_KEY if not auth_err else SECRET_KEY + "extra",
45+
"bucket": f"{MINIO_BUCKET}/{pipeline_name}",
46+
"access_key": access_key,
47+
"secret_key": secret_key if not auth_err else secret_key + "extra",
4548
"provider": "Minio",
46-
"endpoint": endpoint or DEFAULT_ENDPOINT,
49+
"endpoint": endpoint or MINIO_ENDPOINT,
50+
"region": MINIO_REGION,
4751
"start_from_checkpoint": start_from_checkpoint,
4852
"fail_if_no_checkpoint": strict,
4953
"standby": standby,
@@ -606,7 +610,7 @@ def test_read_bucket(
606610
uuid = source.sync_checkpoint(wait=True)
607611
source.stop(force=True)
608612

609-
source_bucket = f"{DEFAULT_BUCKET}/{source.name}"
613+
source_bucket = f"{MINIO_BUCKET}/{source.name}"
610614

611615
# Step 2: start the main pipeline with an empty bucket and read_bucket
612616
# pointing at the source.
@@ -705,7 +709,7 @@ def test_bucket_preferred_over_read_bucket(self):
705709
source.sync_checkpoint(wait=True)
706710
source.stop(force=True)
707711

708-
source_bucket = f"{DEFAULT_BUCKET}/{source.name}"
712+
source_bucket = f"{MINIO_BUCKET}/{source.name}"
709713

710714
# Step 2: push a checkpoint to the main pipeline's own bucket.
711715
storage_config = storage_cfg(self.pipeline.name)
@@ -782,7 +786,7 @@ def test_standby_bucket_takes_over_from_read_bucket(self):
782786
source.sync_checkpoint(wait=True)
783787
source.stop(force=True)
784788

785-
source_bucket = f"{DEFAULT_BUCKET}/{source.name}"
789+
source_bucket = f"{MINIO_BUCKET}/{source.name}"
786790

787791
# Step 2: start the main pipeline; it will push newer checkpoints to
788792
# its own bucket during the test.
@@ -976,7 +980,7 @@ def test_local_priority_over_read_bucket(self):
976980
source.sync_checkpoint(wait=True)
977981
source.stop(force=True)
978982

979-
source_bucket = f"{DEFAULT_BUCKET}/{source.name}"
983+
source_bucket = f"{MINIO_BUCKET}/{source.name}"
980984

981985
# Step 2: main pipeline takes a LOCAL-ONLY checkpoint (never synced).
982986
# Its own S3 bucket stays empty, ensuring the only remote source of data
@@ -1061,7 +1065,7 @@ def test_local_priority_over_read_bucket_from_uuid(self):
10611065
source.sync_checkpoint(wait=True)
10621066
source.stop(force=True)
10631067

1064-
source_bucket = f"{DEFAULT_BUCKET}/{source.name}"
1068+
source_bucket = f"{MINIO_BUCKET}/{source.name}"
10651069

10661070
# Step 2: main pipeline takes a LOCAL-ONLY checkpoint (never synced).
10671071
# Main bucket stays empty; source_bucket (read_bucket) has a checkpoint
@@ -1130,7 +1134,7 @@ def test_read_bucket_strict_fail(self):
11301134
ft = FaultToleranceModel.AtLeastOnce
11311135

11321136
# A bucket path with no checkpoints.
1133-
empty_read_bucket = f"{DEFAULT_BUCKET}/{self.pipeline.name}_read_bucket_empty"
1137+
empty_read_bucket = f"{MINIO_BUCKET}/{self.pipeline.name}_read_bucket_empty"
11341138

11351139
storage_config = storage_cfg(
11361140
self.pipeline.name,
@@ -1174,7 +1178,7 @@ def test_bucket_preferred_over_read_bucket_from_uuid(self):
11741178
source.sync_checkpoint(wait=True)
11751179
source.stop(force=True)
11761180

1177-
source_bucket = f"{DEFAULT_BUCKET}/{source.name}"
1181+
source_bucket = f"{MINIO_BUCKET}/{source.name}"
11781182

11791183
# Step 2: main pipeline creates and syncs its own checkpoint.
11801184
storage_config = storage_cfg(self.pipeline.name)

0 commit comments

Comments
 (0)