Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: accept job object as argument toget_job andcancel_job#617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
gcf-merge-on-green merged 2 commits intogoogleapis:masterfromtswast:job_or_id
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletionsgoogle/cloud/bigquery/client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -1734,12 +1734,20 @@ def get_job(
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get

Args:
job_id (str): Unique job identifier.
job_id (Union[ \
str, \
google.cloud.bigquery.job.LoadJob, \
google.cloud.bigquery.job.CopyJob, \
google.cloud.bigquery.job.ExtractJob, \
google.cloud.bigquery.job.QueryJob \
]): Job identifier.

Keyword Arguments:
project (Optional[str]):
ID of the project which owns the job (defaults to the client's project).
location (Optional[str]): Location where the job was run.
location (Optional[str]):
Location where the job was run. Ignored if ``job_id`` is a job
object.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
Expand All@@ -1757,6 +1765,10 @@ def get_job(
"""
extra_params = {"projection": "full"}

project, location, job_id = _extract_job_reference(
job_id, project=project, location=location
)

if project is None:
project = self.project

Expand DownExpand Up@@ -1791,12 +1803,20 @@ def cancel_job(
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel

Args:
job_id (str): Unique job identifier.
job_id (Union[ \
str, \
google.cloud.bigquery.job.LoadJob, \
google.cloud.bigquery.job.CopyJob, \
google.cloud.bigquery.job.ExtractJob, \
google.cloud.bigquery.job.QueryJob \
]): Job identifier.

Keyword Arguments:
project (Optional[str]):
ID of the project which owns the job (defaults to the client's project).
location (Optional[str]): Location where the job was run.
location (Optional[str]):
Location where the job was run. Ignored if ``job_id`` is a job
object.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
Expand All@@ -1814,6 +1834,10 @@ def cancel_job(
"""
extra_params = {"projection": "full"}

project, location, job_id = _extract_job_reference(
job_id, project=project, location=location
)

if project is None:
project = self.project

Expand DownExpand Up@@ -3518,6 +3542,37 @@ def _item_to_table(iterator, resource):
return TableListItem(resource)


def _extract_job_reference(job, project=None, location=None):
"""Extract fully-qualified job reference from a job-like object.

Args:
job_id (Union[ \
str, \
google.cloud.bigquery.job.LoadJob, \
google.cloud.bigquery.job.CopyJob, \
google.cloud.bigquery.job.ExtractJob, \
google.cloud.bigquery.job.QueryJob \
]): Job identifier.
project (Optional[str]):
Project where the job was run. Ignored if ``job_id`` is a job
object.
location (Optional[str]):
Location where the job was run. Ignored if ``job_id`` is a job
object.

Returns:
Tuple[str, str, str]: ``(project, location, job_id)``
"""
if hasattr(job, "job_id"):
project = job.project
job_id = job.job_id
location = job.location
else:
job_id = job

return (project, location, job_id)


def _make_job_id(job_id, prefix=None):
"""Construct an ID for a new job.

Expand Down
11 changes: 6 additions & 5 deletionstests/system/test_client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -189,7 +189,9 @@ def test_get_service_account_email(self):
def _create_bucket(self, bucket_name, location=None):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
retry_storage_errors(bucket.create)(location=location)
retry_storage_errors(storage_client.create_bucket)(
bucket_name, location=location
)
self.to_delete.append(bucket)

return bucket
Expand DownExpand Up@@ -872,7 +874,7 @@ def test_load_table_from_file_w_explicit_location(self):
job_id = load_job.job_id

# Can get the job from the EU.
load_job = client.get_job(job_id, location="EU")
load_job = client.get_job(load_job)
self.assertEqual(job_id, load_job.job_id)
self.assertEqual("EU", load_job.location)
self.assertTrue(load_job.exists())
Expand All@@ -889,7 +891,7 @@ def test_load_table_from_file_w_explicit_location(self):

# Can cancel the job from the EU.
self.assertTrue(load_job.cancel())
load_job = client.cancel_job(job_id, location="EU")
load_job = client.cancel_job(load_job)
self.assertEqual(job_id, load_job.job_id)
self.assertEqual("EU", load_job.location)

Expand DownExpand Up@@ -1204,8 +1206,7 @@ def test_query_w_timeout(self):
# Even though the query takes >1 second, the call to getQueryResults
# should succeed.
self.assertFalse(query_job.done(timeout=1))

Config.CLIENT.cancel_job(query_job.job_id, location=query_job.location)
self.assertIsNotNone(Config.CLIENT.cancel_job(query_job))

def test_query_w_page_size(self):
page_size = 45
Expand Down
43 changes: 28 additions & 15 deletionstests/unit/test_client.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2933,31 +2933,30 @@ def test_get_job_miss_w_explict_project(self):
conn = client._connection = make_connection()

with self.assertRaises(NotFound):
client.get_job(JOB_ID, project=OTHER_PROJECT, location=self.LOCATION)
client.get_job(JOB_ID, project=OTHER_PROJECT)

conn.api_request.assert_called_once_with(
method="GET",
path="/projects/OTHER_PROJECT/jobs/NONESUCH",
query_params={"projection": "full", "location": self.LOCATION},
query_params={"projection": "full"},
timeout=None,
)

def test_get_job_miss_w_client_location(self):
from google.cloud.exceptions import NotFound

OTHER_PROJECT = "OTHER_PROJECT"
JOB_ID = "NONESUCH"
creds = _make_credentials()
client = self._make_one(self.PROJECT, creds, location=self.LOCATION)
client = self._make_one("client-proj", creds, location="client-loc")
conn = client._connection = make_connection()

with self.assertRaises(NotFound):
client.get_job(JOB_ID, project=OTHER_PROJECT)
client.get_job(JOB_ID)

conn.api_request.assert_called_once_with(
method="GET",
path="/projects/OTHER_PROJECT/jobs/NONESUCH",
query_params={"projection": "full", "location":self.LOCATION},
path="/projects/client-proj/jobs/NONESUCH",
query_params={"projection": "full", "location":"client-loc"},
timeout=None,
)

Expand All@@ -2971,7 +2970,11 @@ def test_get_job_hit_w_timeout(self):
QUERY = "SELECT * from test_dataset:test_table"
ASYNC_QUERY_DATA = {
"id": "{}:{}".format(self.PROJECT, JOB_ID),
"jobReference": {"projectId": self.PROJECT, "jobId": "query_job"},
"jobReference": {
"projectId": "resource-proj",
"jobId": "query_job",
"location": "us-east1",
},
"state": "DONE",
"configuration": {
"query": {
Expand All@@ -2989,18 +2992,21 @@ def test_get_job_hit_w_timeout(self):
creds = _make_credentials()
client = self._make_one(self.PROJECT, creds)
conn = client._connection = make_connection(ASYNC_QUERY_DATA)
job_from_resource = QueryJob.from_api_repr(ASYNC_QUERY_DATA, client)

job = client.get_job(JOB_ID, timeout=7.5)
job = client.get_job(job_from_resource, timeout=7.5)

self.assertIsInstance(job, QueryJob)
self.assertEqual(job.job_id, JOB_ID)
self.assertEqual(job.project, "resource-proj")
self.assertEqual(job.location, "us-east1")
self.assertEqual(job.create_disposition, CreateDisposition.CREATE_IF_NEEDED)
self.assertEqual(job.write_disposition, WriteDisposition.WRITE_TRUNCATE)

conn.api_request.assert_called_once_with(
method="GET",
path="/projects/PROJECT/jobs/query_job",
query_params={"projection": "full"},
path="/projects/resource-proj/jobs/query_job",
query_params={"projection": "full", "location": "us-east1"},
timeout=7.5,
)

Expand DownExpand Up@@ -3049,25 +3055,32 @@ def test_cancel_job_hit(self):
QUERY = "SELECT * from test_dataset:test_table"
QUERY_JOB_RESOURCE = {
"id": "{}:{}".format(self.PROJECT, JOB_ID),
"jobReference": {"projectId": self.PROJECT, "jobId": "query_job"},
"jobReference": {
"projectId": "job-based-proj",
"jobId": "query_job",
"location": "asia-northeast1",
},
"state": "RUNNING",
"configuration": {"query": {"query": QUERY}},
}
RESOURCE = {"job": QUERY_JOB_RESOURCE}
creds = _make_credentials()
client = self._make_one(self.PROJECT, creds)
conn = client._connection = make_connection(RESOURCE)
job_from_resource = QueryJob.from_api_repr(QUERY_JOB_RESOURCE, client)

job = client.cancel_job(JOB_ID)
job = client.cancel_job(job_from_resource)

self.assertIsInstance(job, QueryJob)
self.assertEqual(job.job_id, JOB_ID)
self.assertEqual(job.project, "job-based-proj")
self.assertEqual(job.location, "asia-northeast1")
self.assertEqual(job.query, QUERY)

conn.api_request.assert_called_once_with(
method="POST",
path="/projects/PROJECT/jobs/query_job/cancel",
query_params={"projection": "full"},
path="/projects/job-based-proj/jobs/query_job/cancel",
query_params={"projection": "full", "location": "asia-northeast1"},
timeout=None,
)

Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp