PubSub to BigQuery via Dataflow with GCP

2021/11/163 min read
bookmark this
Responsive image

GCP Pub/Sub is managed messaging service provide by GCP, this blog demo how to create Pub/Sub and use dataflow to subscribe data into BigQuery.

Create BQ Table for test

First, need to create a BQ table, so we can store our message from Pub/Sub we send, below is the JSON and BQ Table assume we want to store the information.

{“id”: 100, “name”:”Smith James", url:”https://testsmithjames.com”,”age”:30,”price”:300,”desc”:”I like sell my arts.”}

PubSub Topic

Here, we try to create a topic via gcloud command line, 

To create a PubSub Topic

gcloud pubsub topics create test_topic_myname

In case, need to delete the topic, below command is how to delete a PubSub Topic

gcloud pubsub topics delete test_topic_myname

Create a Dataflow

Now, the following command example will create a dataflow job by using the template PubSub to BigQuery and will insert the new record to the Big Query table defined.

This command also expects the cloud bucket already exists.

gcloud dataflow jobs run ps-to-bq-test-topic-myname --gcs-location gs://dataflow-templates-us-central1/latest/PubSub_to_BigQuery --region us-central1 --staging-location gs://test-pubsub-name-topic/temp --parameters inputTopic=projects/test-dataflow-sql/topics/test_topic_myname,outputTableSpec=test-dataflow-sql:dataflow_sql_tutorial.test_pubsub_to_bq

Manually Publish PubSub Message

Let's test the PubSub Dataflow to BigQuery, go to the GCP Pub/Sub page, and paste the below Json, once click the publish button should see the new record in BigQuery table.

{
  "id": 100,
  "name": "smith james",
  "url": "https://testsmithjames.com",
  "age": 30,
  "price": 300,
  "desc": "I like sell my arts"
}

Send Pub/Sub Message from C#

Following is a sample code by using C# to send a message to the projecId and topicId.

TopicName topicName = TopicName.FromProjectTopic(ProjectId, TopicId);

PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

SamplePubSubModel sampleModel = new()
{
                id = 304,
                name = "my name",
                url = "https://test-url",
                age = 40,
                price = 300,
                desc = "some desc"
};

await publisher.PublishAsync(JsonConvert.SerializeObject(sampleModel));

Above C# code require the Google.Cloud.PubSub.V1, but also need to authenticate then the code will successfully run.

Authenticating as a service account

Now, just run the C# code above, and you'll see the exception which requires providing the google credentials. There're a few ways to achieve this, what we're doing here is trying to create a service account that has permission to pub/sub message.

Once create a service account, make sure has the Role Pub/Sub Publisher.

Create a token and save the credential JSON file to the local, if you're using windows, run the below command. Now, after following the below command, might still see the credential error but, it might be fixed after restarting the windows.

set GOOGLE_APPLICATION_CREDENTIALS=key_path

Conclusion

Should be very easy to set up all processes, but might take some time on how the google application credentials especially if you're using windows.