GCP Stream Processing with Cloud Pub/Sub and Dataflow 실습
.
Data_Engineering_TIL(20220305)
[Overview]
Google Cloud Pub/Sub is a messaging service for exchanging event data among applications and services. A producer of data publishes messages to a Cloud Pub/Sub topic. A consumer creates a subscription to that topic. Subscribers either pull messages from a subscription or are configured as webhooks for push subscriptions. Every subscriber must acknowledge each message within a configurable window of time.
Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes with equal reliability and expressiveness. It provides a simplified pipeline development environment using the Apache Beam SDK, which has a rich set of windowing and session analysis primitives as well as an ecosystem of source and sink connectors.
Pub/Sub is a scalable, durable event ingestion and delivery system. Dataflow compliments Pub/Sub’s scalable, at-least-once delivery model with message deduplication and exactly-once, in-order processing if you use windows and buffering.
[What You’ll Do]
Read messages published to a Pub/Sub topic
Window (or group) the messages by timestamp
Write the messages to Cloud Storage
[실습 아키텍처]
cloudshell
scheduler job on app engine --> pub/sub --> dataflow --> gcs
** cloudshell : 자원생성 등 명령어를 컨트롤 하는 역할
[실습 요약]
STEP 1) 실습 리소스 생성
STEP 2) Start the Pipeline
STEP 3) Observe Job and Pipeline Progress
STEP 4) Cleanup
[실습 상세내용]
GCP 클라우드 쉘을 열고 아래와 같이 명령어를 실행하면 위와 같은 실습아키텍처를 구현할 수 있다.
STEP 1) 실습 리소스 생성
Welcome to Cloud Shell! Type "help" to get started.
To set your Cloud Platform project in this session use “gcloud config set project [PROJECT_ID]”
student_03_8b4d04139f65@cloudshell:~$ gcloud auth list
Credentialed Accounts
ACTIVE: *
ACCOUNT: student-03-8b4d04139f65@qwiklabs.net
To set the active account, run:
$ gcloud config set account `ACCOUNT`
student_03_8b4d04139f65@cloudshell:~$ gcloud config set project qwiklabs-gcp-01-7a0357624d5f
Updated property [core/project].
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ PROJECT_ID=$(gcloud config get-value project)
Your active configuration is: [cloudshell-21702]
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ BUCKET_NAME=$PROJECT_ID
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ TOPIC_ID=student-03-8b4d04139f65
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ REGION=us-central1
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ AE_REGION=us-central
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ gsutil mb gs://$BUCKET_NAME
Creating gs://qwiklabs-gcp-01-7a0357624d5f/...
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ gcloud pubsub topics create $TOPIC_ID
Created topic [projects/qwiklabs-gcp-01-7a0357624d5f/topics/student-03-8b4d04139f65].
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ gcloud app create --region=$AE_REGION
You are creating an app for project [qwiklabs-gcp-01-7a0357624d5f].
WARNING: Creating an App Engine application for a project is irreversible and the region
cannot be changed. More information about regions is at
<https://cloud.google.com/appengine/docs/locations>.
Creating App Engine application in project [qwiklabs-gcp-01-7a0357624d5f] and region [us-central]....done.
Success! The app is now created. Please use `gcloud app deploy` to deploy your first app.
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" --topic=$TOPIC_ID --message-body="Hello!"
API [cloudscheduler.googleapis.com] not enabled on project [858729720438]. Would you like to enable and retry (this will take a few minutes)? (y/N)? y
Enabling service [cloudscheduler.googleapis.com] on project [858729720438]...
Operation "operations/acf.p2-858729720438-cd1153ac-334e-49f8-8e9c-cbfb10f37679" finished successfully.
WARNING: We are using the App Engine app location (us-central1) as the default location. Please use the "--location" flag if you want to use a different location.
name: projects/qwiklabs-gcp-01-7a0357624d5f/locations/us-central1/jobs/publisher-job
pubsubTarget:
data: SGVsbG8h
topicName: projects/qwiklabs-gcp-01-7a0357624d5f/topics/student-03-8b4d04139f65
retryConfig:
maxBackoffDuration: 3600s
maxDoublings: 16
maxRetryDuration: 0s
minBackoffDuration: 5s
schedule: '* * * * *'
state: ENABLED
timeZone: Etc/UTC
userUpdateTime: '2022-03-05T07:22:27Z'
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5)$ gcloud scheduler jobs run publisher-job
WARNING: We are using the App Engine app location (us-central1) as the default location. Please use the "--location" flag if you want to use a different location.
# cloud shell 디폴트 python 버전이 3.9 버전대인데 이거를 그대로 사용하면 apache-beam 실행시 파이썬 버전 지원이 안된다고 에러가 발생한다. 그래서 파이썬 3.7이나 3.8을 설치해야 한다.
# Install requirements
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ sudo apt-get install -y build-essential checkinstall libreadline-gplv2-dev libncursesw5-dev libssl-dev libsqlite3-dev tk-dev libgdbm-dev libc6-dev libbz2-dev zlib1g-dev openssl libffi-dev python3-dev python3-setuptools wget
# Prepare to build
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ mkdir /tmp/Python37
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ cd /tmp/Python37
# Pull down Python 3.7, build, and install
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tar.xz
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ tar xvf Python-3.7.0.tar.xz
student_03_8b4d04139f65@cloudshell:~ (qwiklabs-gcp-01-7a0357624d5f)$ cd /tmp/Python37/Python-3.7.0
student_03_8b4d04139f65@cloudshell:/tmp/Python37/Python-3.7.0 (qwiklabs-gcp-01-7a0357624d5f)$ ./configure
student_03_8b4d04139f65@cloudshell/tmp/Python37/Python-3.7.0 (qwiklabs-gcp-01-7a0357624d5f)$ sudo make altinstall
student_03_8b4d04139f65@cloudshell:/tmp/Python37/Python-3.7.0 (qwiklabs-gcp-01-7a0357624d5f)$ virtualenv env -p python3.7.0
created virtual environment CPython3.7.0.final.0-64 in 1354ms
creator CPython3Posix(dest=/tmp/Python37/Python-3.7.0/env, clear=False, no_vcs_ignore=False, global=False)
seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/home/student_03_8b4d04139f65/.local/share/virtualenv)
added seed packages: pip==22.0.3, setuptools==60.6.0, wheel==0.37.1
activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
student_03_8b4d04139f65@cloudshell:/tmp/Python37/Python-3.7.0$ source env/bin/activate
(env) student_03_8b4d04139f65@cloudshell:/tmp/Python37/Python-3.7.0$ python --version
Python 3.7.0
STEP 2) Start the Pipeline
아래와 같은 샘플 코드를 실행하여 데이터 파이프라인을 구동할 것이다.
This sample code uses Dataflow to:
Read Pub/Sub messages.
Window (or group) messages into fixed-size intervals by publish timestamps.
Write the messages in each window to files in Cloud Storage.
샘플코드 : PubSubToGCS.py
import argparse
from datetime import datetime
import logging
import random
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
class GroupMessagesByFixedWindows(PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of tuples, each containing a message and its publish time.
"""
def __init__(self, window_size, num_shards=5):
# Set window size to 60 seconds.
self.window_size = int(window_size * 60)
self.num_shards = num_shards
def expand(self, pcoll):
return (
pcoll
# Bind window info to each element using element timestamp (or publish time).
| "Window into fixed intervals"
>> WindowInto(FixedWindows(self.window_size))
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
# Assign a random key to each windowed element based on the number of shards.
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
# Group windowed elements by key. All the elements in the same window must fit
# memory for this. If not, you need to use `beam.util.BatchElements`.
| "Group by key" >> GroupByKey()
)
class AddTimestamp(DoFn):
def process(self, element, publish_time=DoFn.TimestampParam):
"""Processes each windowed element by extracting the message body and its
publish time into a tuple.
"""
yield (
element.decode("utf-8"),
datetime.utcfromtimestamp(float(publish_time)).strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
)
class WriteToGCS(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, key_value, window=DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
shard_id, batch = key_value
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body, publish_time in batch:
f.write(f"{message_body},{publish_time}\n".encode("utf-8"))
def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
# Set `save_main_session` to True so DoFns can access globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
# binds the publish time returned by the Pub/Sub server for each message
# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from."
'"projects//topics/".',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in minutes.",
)
parser.add_argument(
"--output_path",
help="Path of the output GCS file including the prefix.",
)
parser.add_argument(
"--num_shards",
type=int,
default=5,
help="Number of shards to use when writing windowed elements to GCS.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
known_args.num_shards,
pipeline_args,
)
아래의 명령어를 이용해서 파이프라인을 동작시켜보자.
python PubSubToGCS.py \
--project=$PROJECT_ID \
--region=$REGION \
--input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
--output_path=gs://$BUCKET_NAME/samples/output \
--runner=DataflowRunner \
--window_size=2 \
--num_shards=2 \
--temp_location=gs://$BUCKET_NAME/temp
(env) student_03_8b4d04139f65@cloudshell:/tmp/Python37/Python-3.7.0/python-docs-samples/pubsub/streaming-analytics (qwiklabs-gcp-01-7a0357624d5f)$ python PubSubToGCS.py --project=$PROJECT_ID --region=$REGION --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID --output_path=gs://$BUCKET_NAME/samples/output --runner=DataflowRunner --window_size=2 --num_shards=2 --temp_location=gs://$BUCKET_NAME/temp
INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/tmp/Python37/Python-3.7.0/env/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmp19j5yx5_', 'apache-beam==2.33.0', '--no-deps', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/tmp/Python37/Python-3.7.0/env/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmp19j5yx5_', 'apache-beam==2.33.0', '--no-deps', '--only-binary', ':all:', '--python-version', '37', '--implementation', 'cp', '--abi', 'cp37m', '--platform', 'manylinux1_x86_64']
INFO:apache_beam.runners.portability.stager:Staging binary distribution of the SDK from PyPI: apache_beam-2.33.0-cp37-cp37m-manylinux1_x86_64.whl
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.33.0
INFO:root:Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/python37-fnapi:2.33.0
INFO:root:Python SDK container image set to "gcr.io/cloud-dataflow/v1beta3/python37-fnapi:2.33.0" for Docker environment
INFO:apache_beam.runners.dataflow.internal.apiclient:Defaulting to the temp_location as staging_location: gs://qwiklabs-gcp-01-7a0357624d5f/temp
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://qwiklabs-gcp-01-7a0357624d5f/temp/beamapp-student038b4d04139f65-0305073824-781086.1646465904.781712/pickled_main_session...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://qwiklabs-gcp-01-7a0357624d5f/temp/beamapp-student038b4d04139f65-0305073824-781086.1646465904.781712/pickled_main_session in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://qwiklabs-gcp-01-7a0357624d5f/temp/beamapp-student038b4d04139f65-0305073824-781086.1646465904.781712/dataflow_python_sdk.tar...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://qwiklabs-gcp-01-7a0357624d5f/temp/beamapp-student038b4d04139f65-0305073824-781086.1646465904.781712/dataflow_python_sdk.tar in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://qwiklabs-gcp-01-7a0357624d5f/temp/beamapp-student038b4d04139f65-0305073824-781086.1646465904.781712/apache_beam-2.33.0-cp37-cp37m-manylinux1_x86_64.whl...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://qwiklabs-gcp-01-7a0357624d5f/temp/beamapp-student038b4d04139f65-0305073824-781086.1646465904.781712/apache_beam-2.33.0-cp37-cp37m-manylinux1_x86_64.whl in 2 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://qwiklabs-gcp-01-7a0357624d5f/temp/beamapp-student038b4d04139f65-0305073824-781086.1646465904.781712/pipeline.pb...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://qwiklabs-gcp-01-7a0357624d5f/temp/beamapp-student038b4d04139f65-0305073824-781086.1646465904.781712/pipeline.pb in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
createTime: '2022-03-05T07:38:31.110117Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2022-03-04_23_38_29-6012764980959823246'
location: 'us-central1'
name: 'beamapp-student038b4d04139f65-0305073824-781086'
projectId: 'qwiklabs-gcp-01-7a0357624d5f'
stageStates: []
startTime: '2022-03-05T07:38:31.110117Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)>
INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [2022-03-04_23_38_29-6012764980959823246]
INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: 2022-03-04_23_38_29-6012764980959823246
INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/2022-03-04_23_38_29-6012764980959823246?project=qwiklabs-gcp-01-7a0357624d5f
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2022-03-04_23_38_29-6012764980959823246 is in state JOB_STATE_PENDING
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:29.926Z: JOB_MESSAGE_BASIC: Streaming Engine auto-enabled. Use --experiments=disable_streaming_engine to opt out.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:29.926Z: JOB_MESSAGE_BASIC: Dataflow Runner V2 auto-enabled. Use --experiments=disable_runner_v2 to opt out.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:31.885Z: JOB_MESSAGE_WARNING: Autoscaling is enabled for Dataflow Streaming Engine. Workers will scale between 1 and 100 unless maxNumWorkers is specified.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:32.122Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled for job 2022-03-04_23_38_29-6012764980959823246. The number of workers will be between 1 and 100.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:32.146Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled for job 2022-03-04_23_38_29-6012764980959823246.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.091Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-2 in us-central1-c.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.742Z: JOB_MESSAGE_DETAILED: Expanding SplittableParDo operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.780Z: JOB_MESSAGE_DETAILED: Expanding CollectionToSingleton operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.846Z: JOB_MESSAGE_DETAILED: Expanding CoGroupByKey operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.871Z: JOB_MESSAGE_DEBUG: Combiner lifting skipped for step Window into/Group by key: GroupByKey not followed by a combiner.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.895Z: JOB_MESSAGE_DETAILED: Expanding SplittableProcessKeyed operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.923Z: JOB_MESSAGE_DETAILED: Expanding GroupByKey operations into streaming Read/Write steps
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.990Z: JOB_MESSAGE_DETAILED: Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.060Z: JOB_MESSAGE_DEBUG: Annotating graph with Autotuner information.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.112Z: JOB_MESSAGE_DETAILED: Fusing adjacent ParDo, Read, Write, and Flatten operations
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.143Z: JOB_MESSAGE_DETAILED: Fusing consumer Window into/Window into fixed intervals into Read from Pub/Sub/Read
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.179Z: JOB_MESSAGE_DETAILED: Fusing consumer Window into/Add timestamp to windowed elements into Window into/Window into fixed intervals
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.203Z: JOB_MESSAGE_DETAILED: Fusing consumer Window into/Add key/Map(<lambda at util.py:780>) into Window into/Add timestamp to windowed elements
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.235Z: JOB_MESSAGE_DETAILED: Fusing consumer Window into/Group by key/WriteStream into Window into/Add key/Map(<lambda at util.py:780>)
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.268Z: JOB_MESSAGE_DETAILED: Fusing consumer Window into/Group by key/MergeBuckets into Window into/Group by key/ReadStream
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.302Z: JOB_MESSAGE_DETAILED: Fusing consumer Write to GCS into Window into/Group by key/MergeBuckets
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.339Z: JOB_MESSAGE_BASIC: Running job using Streaming Engine
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.370Z: JOB_MESSAGE_DEBUG: Workflow config is missing a default resource spec.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.399Z: JOB_MESSAGE_DEBUG: Adding StepResource setup and teardown to workflow graph.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.431Z: JOB_MESSAGE_DEBUG: Adding workflow start and stop steps.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.464Z: JOB_MESSAGE_DEBUG: Assigning stage ids.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.526Z: JOB_MESSAGE_DEBUG: Starting worker pool setup.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.555Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-central1-c...
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2022-03-04_23_38_29-6012764980959823246 is in state JOB_STATE_RUNNING
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.620Z: JOB_MESSAGE_DEBUG: Starting worker pool setup.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:38.039Z: JOB_MESSAGE_DETAILED: Pub/Sub resources set up for topic 'projects/qwiklabs-gcp-01-7a0357624d5f/topics/student-03-8b4d04139f65'.
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2022-03-04_23_38_29-6012764980959823246 is in state JOB_STATE_PENDING
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:29.926Z: JOB_MESSAGE_BASIC: Streaming Engine auto-enabled. Use --experiments=disable_streaming_engine to opt out.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:29.926Z: JOB_MESSAGE_BASIC: Dataflow Runner V2 auto-enabled. Use --experiments=disable_runner_v2 to opt out.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:31.885Z: JOB_MESSAGE_WARNING: Autoscaling is enabled for Dataflow Streaming Engine. Workers will scale between 1 and 100 unless maxNumWorkers is specified.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:32.122Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled for job 2022-03-04_23_38_29-6012764980959823246. The number of workers will be between 1 and 100.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:32.146Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled for job 2022-03-04_23_38_29-6012764980959823246.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.091Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-2 in us-central1-c.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.742Z: JOB_MESSAGE_DETAILED: Expanding SplittableParDo operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.780Z: JOB_MESSAGE_DETAILED: Expanding CollectionToSingleton operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.846Z: JOB_MESSAGE_DETAILED: Expanding CoGroupByKey operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.871Z: JOB_MESSAGE_DEBUG: Combiner lifting skipped for step Window into/Group by key: GroupByKey not followed by a combiner.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.895Z: JOB_MESSAGE_DETAILED: Expanding SplittableProcessKeyed operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.923Z: JOB_MESSAGE_DETAILED: Expanding GroupByKey operations into streaming Read/Write steps
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:35.990Z: JOB_MESSAGE_DETAILED: Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.060Z: JOB_MESSAGE_DEBUG: Annotating graph with Autotuner information.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.112Z: JOB_MESSAGE_DETAILED: Fusing adjacent ParDo, Read, Write, and Flatten operations
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.143Z: JOB_MESSAGE_DETAILED: Fusing consumer Window into/Window into fixed intervals into Read from Pub/Sub/Read
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.179Z: JOB_MESSAGE_DETAILED: Fusing consumer Window into/Add timestamp to windowed elements into Window into/Window into fixed intervals
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.203Z: JOB_MESSAGE_DETAILED: Fusing consumer Window into/Add key/Map(<lambda at util.py:780>) into Window into/Add timestamp to windowed elements
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.235Z: JOB_MESSAGE_DETAILED: Fusing consumer Window into/Group by key/WriteStream into Window into/Add key/Map(<lambda at util.py:780>)
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.268Z: JOB_MESSAGE_DETAILED: Fusing consumer Window into/Group by key/MergeBuckets into Window into/Group by key/ReadStream
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.302Z: JOB_MESSAGE_DETAILED: Fusing consumer Write to GCS into Window into/Group by key/MergeBuckets
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.339Z: JOB_MESSAGE_BASIC: Running job using Streaming Engine
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.370Z: JOB_MESSAGE_DEBUG: Workflow config is missing a default resource spec.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.399Z: JOB_MESSAGE_DEBUG: Adding StepResource setup and teardown to workflow graph.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.431Z: JOB_MESSAGE_DEBUG: Adding workflow start and stop steps.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.464Z: JOB_MESSAGE_DEBUG: Assigning stage ids.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.526Z: JOB_MESSAGE_DEBUG: Starting worker pool setup.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.555Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-central1-c...
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2022-03-04_23_38_29-6012764980959823246 is in state JOB_STATE_RUNNING
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:36.620Z: JOB_MESSAGE_DEBUG: Starting worker pool setup.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:38:38.039Z: JOB_MESSAGE_DETAILED: Pub/Sub resources set up for topic 'projects/qwiklabs-gcp-01-7a0357624d5f/topics/student-03-8b4d04139f65'.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:39:21.171Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 1 so that the pipeline can catch up with its backlog and keep up with its input rate.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:39:42.445Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
INFO:apache_beam.runners.dataflow.dataflow_runner:2022-03-05T07:39:42.472Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
^CTraceback (most recent call last):
File "PubSubToGCS.py", line 133, in <module>
pipeline_args,
File "PubSubToGCS.py", line 97, in run
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
File "/tmp/Python37/Python-3.7.0/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 587, in __exit__
self.result.wait_until_finish()
File "/tmp/Python37/Python-3.7.0/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1621, in wait_until_finish
time.sleep(5.0)
KeyboardInterrupt
적당한 시간동안 실행시키다가 ctrl + c로 빠져나와본다.
STEP 3) Observe Job and Pipeline Progress
아래 그림과 같이 dataflow 콘솔에 가보면 파이프라인이 실행되고 있음을 확인할 수 있고, gcs에도 가보면 데이터가 쌓이는 것을 확인할 수 있다.
(env) student_03_8b4d04139f65@cloudshell:/tmp/Python37/Python-3.7.0/python-docs-samples/pubsub/streaming-analytics (qwiklabs-gcp-01-7a0357624d5f)$ gsutil ls gs://${BUCKET_NAME}/samples/
gs://qwiklabs-gcp-01-7a0357624d5f/samples/output-07:38-07:40-1
gs://qwiklabs-gcp-01-7a0357624d5f/samples/output-07:40-07:42-0
gs://qwiklabs-gcp-01-7a0357624d5f/samples/output-07:40-07:42-1
env) student_03_8b4d04139f65@cloudshell:/tmp/Python37/Python-3.7.0/python-docs-samples/pubsub/streaming-analytics (qwiklabs-gcp-01-7a0357624d5f)$ gsutil ls gs://${BUCKET_NAME}/samples/
gs://qwiklabs-gcp-01-7a0357624d5f/samples/output-07:38-07:40-1
gs://qwiklabs-gcp-01-7a0357624d5f/samples/output-07:40-07:42-0
gs://qwiklabs-gcp-01-7a0357624d5f/samples/output-07:40-07:42-1
gs://qwiklabs-gcp-01-7a0357624d5f/samples/output-07:42-07:44-0
gs://qwiklabs-gcp-01-7a0357624d5f/samples/output-07:42-07:44-1
STEP 4) Cleanup
$ gcloud scheduler jobs delete publisher-job
# If prompted Do you want to continue press Y and enter.
# Press ctrl + c in your Cloud Shell if it's still busy printing output of your Dataflow job.
# In the Dataflow console, stop the job.
# With your job selected from the Dataflow Console, press the Stop button. Select the Cancel bubble to cancel the pipeline without draining.
$ gcloud pubsub topics delete $TOPIC_ID
$ gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
$ gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
$ gsutil rb gs://${BUCKET_NAME}