AWS Lambda to generate CSV file from RDS PostgreSQL

One of the requirement was to generate csv file for set of queries from RDS PostgreSQL and upload the csv file to s3 bucket for power bi reporting. Powerbi connects to s3 url and generate report. There is no gateway to connect to PostgreSQL instance from power-bi, hence we need to have a mechanism to upload the data to s3 so that powerbi can import it and generate reports. How can you generate csv file and upload to s3 bucket ?

There are multiple ways we can achieve this, one is to use ssm command send over as shell script and use copy command for postgreSQL to generate csv file and push it to s3. Another approach is use pandas module and dataframe to convert the data to csv and push it to s3.

Both the examples are as below. I retreived username and password from parameter store. For more information about parameter store you may refer here.


import boto3
import time
import sys
import os
ec2 = boto3.client('ec2')
ssm_client = boto3.client('ssm')
def lambda_handler(event, context):
def execute_ssm_command(client, commands, instance_ids):
resp = client.send_command(
DocumentName="AWS-RunShellScript",
Parameters={'commands': commands},
InstanceIds=instance_ids,
)
nodename = os.environ['region']+'.'+os.environ['env']+'.'+os.environ['app']+'.'+os.environ['company']+'.'+os.environ['role']
print os.environ['date']
print os.environ['code']
uname = ssm_client.get_parameter(Name='dbusername', WithDecryption=True)
username = uname['Parameter']['Value']
pwd = ssm_client.get_parameter(Name='dbpassword', WithDecryption=True)
password = pwd['Parameter']['Value']
response=ec2.describe_instances()
for reservation in response["Reservations"]:
for instance in reservation["Instances"]:
if(instance.get("Tags")):
for tag in instance['Tags']:
if(tag.get("Key")):
if (tag['Key'] in 'Name'):
if (tag['Value'] in nodename):
print tag['Value']
commands = ['if [ -z "${HOME:-}" ]; then export HOME="$(cd ~ && pwd)"; fi','sudo yum install mail -y', 'sudo yum install postgresql96 -y', '#!/bin/bash ', 'PGPASSWORD='+str(password)+' psql -h postgres -U '+str(username)+' -d dbname -c "\copy (select * from report where date= '+os.environ['date']+' and code= '+os.environ['code']+') to stdout csv header">/$HOME/s3Reportqa.csv', 'aws s3 cp /$HOME/s3Report.csv s3://'+os.environ['bucketpath']+'/', 'printf "Hi All, csv file has been generated successfully. " | mailx -vvv -s "report" -r "noreply@ramasankarmolleti.com" -S smtp="smtp" "sankar276@gmail.com"']
temp = instance["InstanceId"]
instance_ids = [temp]
print os.environ['region']+'.'+os.environ['env']+'.'+os.environ['app']+'.'+os.environ['company']+'.'+os.environ['role']
print instance_ids
execute_ssm_command(ssm_client, commands, instance_ids)

Parameters:

1

Second Method (Using Pandas module)


import psycopg2
import request
import boto3
import pandas as pd
from pandas import Series, DataFrame
import csv
conn_string = "host='dbinstancename' dbname='databasename' user='username' password='password'"
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
print ("Connected")
tablename = 'report'
date = '2019-07-11'
code = 'RAMA'
#cursor.execute("SELECT * FROM " + tablename +" Where date ="+ date + " and code = "+code+";")
cursor.execute("SELECT * FROM " + tablename +" limit 100;")
myresult = cursor.fetchall()
item_list = []
for i in myresult:
item = {'col1':i[0],
'col2':i[1],
'col3' :i[2],
'col4' :i[3],
'col5' :i[4],
'col6' :i[5],
'col7' :i[6],
'col8' :i[7],
'col9' :i[8]}
item_list.append(item)
concat = str(i[0]) + str(',') + str(i[1]) + str(',') + str(i[2]) + str(',') + str(i[3]) + str(',') + str(i[4]) + str(',') + str(i[5]) + str(',') + str(i[6]) + str(',') + str(i[7]) + str(',') + str(i[8])
# print (concat)
df = pd.DataFrame(data=item_list,columns=['col1','col2','col3','col4','col5','col6','col7','col8','col9'])
df.head(30)
print (df.head(40))
# importing the result to csv begins
df.to_csv('rama.csv')
print('csv generated')
# to push the datafram results to s3, we can use boto3 s3 resource as below
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())

You can schedule the lambda using cloudwatch events every 5 minutes to update the data in s3.

Hope you enjoyed the post.

Cheers

Ramasankar Molleti

LinkedIn

Posted in AWS, AWS Lambda, AWS SSM, PostgreSQL, S3 | Leave a comment

Lambda to delete indices older than ‘x’ days on Elastic Search

This is simple example of how we can delete the indices older than ‘x’ days.


import boto3
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
import curator

host = 'XXXXXXXXXXXXXXXX.us-east-1.es.amazonaws.com' # Provide the elasticsearch endpoint
region = 'us-east-1' # Provide the region
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

# Lambda execution starts here.
def lambda_handler(event, context):

# Build the Elasticsearch client.
es = Elasticsearch(
hosts = [{'host': host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)

index_list = curator.IndexList(es)
# Delete the indices for the pattern yyyy-mm-dd* with creation_date greater than x days.
# Source https://curator.readthedocs.io/en/latest/examples.html
index_list.filter_by_age(source='creation_date', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=7)

print("Found %s indices to delete" % len(index_list.indices))

if index_list.indices:
curator.DeleteIndices(index_list).do_action()

print('Indices deleted successfully')

This example needs aws4auth, elasticsearch, curator modules installed. You can build these modules on using linux machine. I’ve used one of the ec2 instance that has amazon linux installed.
# Install Dependancies
yum -y install python-pip zip
pip install virtualenv

# Create the virtual environment
mkdir -p /var/es-cleanup && cd /var/es-cleanup
virtualenv /var/es-cleanup
cd /var/es-cleanup && source bin/activate
pip install requests_aws4auth -t .
pip install elasticsearch -t .
pip install elasticsearch-curator -t .

# Copy the code to current directory and set the file permission to execute mode
chmod 754 es-cleanup.py

# Package the lambda
zip -r /var/es-cleanup.zip *

# Send the package to S3 Bucket
# aws s3 cp /var/es-cleanup s3://BUCKET_NAME/

Hope you enjoyed the post.

Cheers

Ramasankar Molleti

LinkedIn

Posted in Uncategorized | Leave a comment

How To Create Jupyter Notebook using Amazon SageMaker

In this blog post, i would like to present how to create Jupyter Notebook  and run various commands using Amazon Sagemaker

What is Jupyter Notebook?

The Jupyter Notebook is an open-source web application that allows you to create and share documents that contain live code, equations, visualizations and narrative text. Uses include: data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more.  For more information you may refer here.

What is Amazon Sagemaker?

Amazon SageMaker is a fully-managed service that enables data scientists and developers to quickly and easily build, train, and deploy machine learning models at any scale. Amazon SageMaker includes modules that can be used together or independently to build, train, and deploy your machine learning models.  For more information you may refer here.

Let’s create a jupyter notebook using sagemaker

Step1: Open Amazon Sagemaker 

1

Step2: Click on Create notebook instance

2.PNG

Step3: Create a notebook instance name “MyJupyterNotebook” 

If you want  speed up the throughput and decrease the latency of getting real-time inferences from your deep learning models that are deployed a as Amazon sagemaker hosted models you can select elastic inference. In this example, i’m leaving this none.

You will have to create an IAM role  upon instance creation or use existing IAM role arn which has enough permissions to create jupyter notebook instance

You can select the sagemaker to be deployed on dedicated vpc so that the instance can be created in private subnet for internal access. In this example i’m leaving this option as none so that i can access over the internet.

You can configure life cycle configuration to run shell scrips upon instance creation.

You can specify the volume size of the instance. This depends on howmuch you want to store the data on notebook. I leave it as default.

You can select git repository while sagemaker notebook is being created, so that the jupyter will start in the specified repository.  If you would like to encrypt the instance you can use AWS KMS.

Tags: You can specify name of the notebook and the purpose for easy identification.

Click on create notebook instance, this will create jupyter notebook instance as below.

3.PNG

This will take few minutes to create instance depending on the size of the instance we chose.

4.PNG

As you can see the instance was created successfully, let’s open the jupyter notebook by clicking Open Jupyter under Actions tab.

5.PNG

There you go, you are successfully created Jupyter notebook.

Let’s run some commands on the notebook.

Click on New at the top right corner and then you will see many different modules to run the commands. I will select python3.

6.PNG

After you select conda_python3, this will open in new window as below

7.PNG

I’ve created one s3 bucket called “testrama2019” and i will be copying the content in s3 to local jupyter notebook. I will be using aws cli terminal commands.  When you use terminal commands you need to specify “!” before the command.

8.PNG

Let’s execute by clicking Run or you can use shift+enter to run the command

9.PNG

If you look at the results, i’ve a sample csv file which i will be reading after importing pandas module

10.PNG

That’s it. If you look at the above i was able to read the content in csv file using pandas module.

Hope you enjoyed the post.

Cheers

Ramasankar Molleti

LinkedIn

Posted in AWS, Jupyter, Sagemaker | Tagged , | Leave a comment

Python Script – Create and Insert data into Dynamodb table

In this example, i would like to demonstrate how to create a AWS DynamoDB table using python.

I’m taking the simple employee table which contains Id, FirstName, LastName, Dept and Sal columns. Also, i’m going to create a Partition key on id and sort key on Sal columns.  I will use boto3 to call the dynamodb service. For more information about boto3 you can refer here.


import boto3
# Create a table Employee

dynamodb = boto3.resource('dynamodb', region_name='us-east-1')

mytable = dynamodb.create_table(
TableName= 'Employee',
KeySchema=[
{
'KeyType': 'HASH',
'AttributeName': 'Id'
},
{
'KeyType': 'RANGE',
'AttributeName': 'Sal'
}
],
AttributeDefinitions=[
{
'AttributeName': 'Id',
'AttributeType': 'N'
},
{
'AttributeName': 'Sal',
'AttributeType': 'N'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 2,
'WriteCapacityUnits': 2
}
)
# Wait until the table creation complete.
mytable.meta.client.get_waiter('table_exists').wait(TableName='Employee')
print('Table has been created, please continue to insert data.')

If you look at the definition of the attribute while creating table i only mentioned partition key and sort key for creating table and did not specify any other column names (FirstName, LastName, Dept and Sal). With Dynamodb (NOSQL Database) you don’t need to specify every record attribute field ahead of time. You only need to specify the hash and range fields ahead of time.

This will create a table called Employee as below

Dyanamodb2

Dyanamodb1.PNG

As you can see the table Employee created with partition key as Id and Sort key as Sal.

Let’s insert data into table.  Use the below script to insert the data. you can use put_item method to insert the data to dynamodb. You can see the syntax here


mytable.put_item(
    Item={
        'Id': 1,
        'FirstName': 'Ramasankar',
        'LastName': 'Molleti',
        'Dept': 'IT',
        'Sal': 5000
    }
)
mytable.put_item(
    Item={
        'Id': 1,
        'FirstName': 'Sourav',
        'LastName': 'Mukherjee',
        'Dept': 'IT',
        'Sal': 10000
    }
)
mytable.put_item(
    Item={
        'Id': 1,
        'FirstName': 'Praveen',
        'LastName': 'Kumar',
        'Dept': 'Finance',
        'Sal': 5000
    }
)
mytable.put_item(
    Item={
        'Id': 1,
        'FirstName': 'Suresh',
        'LastName': 'Kumar',
        'Dept': 'Finance',
        'Sal': 12000
    }
)

response = mytable.scan()

for i in response['Items']:
    print("added item:", i['Id'], ":", i['FirstName'], ":", i['LastName'], ":", i['Dept'], ":", i['Sal'])

Output:

added item: 1 : Praveen : Kumar : Finance : 5000
added item: 1 : Sourav : Mukherjee : IT : 10000
added item: 1 : Suresh : Kumar : Finance : 12000

Process finished with exit code 0

Dyanamodb3.PNG

As you can see the data has been inserted.  That’s it for creating and inserting data into dynamodb. Here is the below combined script. In this example i also mentioned that i used provisioned read and write throughput to use 2 instead of default values


import boto3
# Create a table called Employee

dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
mytable = dynamodb.create_table(
TableName= 'Employee',
KeySchema=[
{
'KeyType': 'HASH',
'AttributeName': 'Id'
},
{
'KeyType': 'RANGE',
'AttributeName': 'Sal'
}
],
AttributeDefinitions=[
{
'AttributeName': 'Id',
'AttributeType': 'N'
},
{
'AttributeName': 'Sal',
'AttributeType': 'N'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 2,
'WriteCapacityUnits': 2
}
)
# Wait until the table exists.
mytable.meta.client.get_waiter('table_exists').wait(TableName='Employee')
print('Table is ready, please continue to isert data.')

# Insert the data into dynamodb table
mytable.put_item(
Item={
'Id': 1,
'FirstName': 'Ramasankar',
'LastName': 'Molleti',
'Dept': 'IT',
'Sal': 5000
}
)
mytable.put_item(
Item={
'Id': 1,
'FirstName': 'Sourav',
'LastName': 'Mukherjee',
'Dept': 'IT',
'Sal': 10000
}
)
mytable.put_item(
Item={
'Id': 1,
'FirstName': 'Praveen',
'LastName': 'Kumar',
'Dept': 'Finance',
'Sal': 5000
}
)
mytable.put_item(
Item={
'Id': 1,
'FirstName': 'Suresh',
'LastName': 'Kumar',
'Dept': 'Finance',
'Sal': 12000
}
)

response = mytable.scan()

for i in response['Items']:
print("added item:", i['Id'], ":", i['FirstName'], ":", i['LastName'], ":", i['Dept'], ":", i['Sal'])

Hope you enjoyed the post.

Cheers

Ramasankar Molleti

LinkedIn

Posted in AWS, DyanamoDB | Leave a comment

AWS S3—>AWS Lambda —>SSM—> SQL Server Job

Scenario:

Application is running on java and the database is using PostgreSQL.  Application user uploads the data and this data needs to process to analytic server as well. There is no direct access to Analytic Database server that is running on SQL Server on different location. You need to architect a solution to process these data automatically as soon as the data gets uploaded from application.  Also, application user may upload “N” number of files per day but the processes has to pick up the latest among all for that particular day.

My Solution:

  1. Application uploads the data in a CSV format to S3 bucket
  2. Create AWS lambda to process the data to analytic server
  3. Create an s3 event on the S3 bucket
  4. Lambda will trigger SQL Server agent job and process the csv file to Analytic Database Server
  5. SQL Agent job on analytic server picks up the latest file on the s3 bucket and process the data
Step 1: Application uploads the data in a CSV format to S3 bucket

In this example, I’ve create an s3 bucket (testcommandbucket)

1

Let’s upload the data. I’ve uploaded some sample files to demonstrate.

2.PNG

Step 2:  Create AWS lambda to process the data to analytic server

I’ve created a lambda uploaddata in python 2.7 as below

3.PNG


import boto3
import time
import sys
import os
ec2 = boto3.client('ec2')
s3 = boto3.client('s3')
ssm_client = boto3.client('ssm')
def lambda_handler(event, context):
if event:
print("Event: ", event)
file_obj = event["Records"][0]
filename = str(file_obj['s3']['object']['key'])
print("filename: ", filename )
if 'AnalyticData' in filename:
def execute_ssm_command(client, commands, instance_ids):
throttling="False"
sleepTime = 1
while throttling == "False":
try:
resp = client.send_command(
DocumentName="AWS-RunPowerShellScript",
Parameters={'commands': commands},
InstanceIds=instance_ids,
)
throttling="True"
except Exception as e:
print(e)
print("throtelling")
sleepTime = sleepTime*20
time.sleep(sleepTime)
instance_ids = ['i-02742a8']
commands = ['sqlcmd -S AnalysisDBServer -Q "EXEC msdb.dbo.sp_start_job ''SqlJob_Process_Data''"']
execute_ssm_command(ssm_client, commands, instance_ids)
#print(instance["Name"])
#lambda_handler(None,None)

In the above example, you can see i’m sending ssm command as powershell script to start an SQL server agent job.  I also handling throttling if the lambda is executing due to concurrent events.

Note: You can run above code on python 2.7/3.6

Step 3:  Create an s3 event on the S3 bucket

Let’s create an s3 event to trigger lambda when the files gets uploaded to s3 bucket.

4.PNG

If you look at the above screenshot I’ve created s3 event by calling lambda and filtering the s3 bucket data with *.csv files

Step 4:  Lambda will trigger SQL Server agent job and process the csv file to Analytic Database Server

I’ve uploaded latest file on to s3 you can see in the below lambda cloud watch logs that the lambda was triggered and processed the latest file AnalyticData-2018-12-22.csv

5.PNG

This lambda executes ssm command against the Database Server Ec2 instance to start the sql agent job.  You can see the ssm commands logs as below that the job was triggered

7.PNG

Step 5: SQL Agent job on analytic server picks up the latest file on the s3 bucket and process the data

Final step is let’s look at the sql job is triggered or not.

6.PNG

You can see that the job was executed successfully.

What does the sql job do?

5.1. Copies the latest file from s3 bucket to local server

5.2. Import the csv data to analysis table using bulk openrowset method

5.3. Move the processed file to processed folder

5.4. Trigger the SSRS subscription to send reports as email to stake holders

All the 5.1 to 5.4 are outside the scope of this article.

Hope you enjoyed the post!

Cheers

Ramasankar Molleti

LinkedIn

 

 

 

 

 

Posted in AWS, AWS Lambda, AWS SSM, PostgreSQL, S3, SQL Server 2016 | Leave a comment

AWS Lambda to connect to PostgreSQL and execute a function/query using Python

It’s been long time since i wrote a blog post. In this post i would like to show an example of lambda to connect to PostgreSQL database and execute the query.

  1. Build a PostgreSQL psycopg module in order to connect to database. I’ve built a custom compiled python library to connect to latest PostgreSQL 9.6 database using this. You may download the package from here 
  2. Write a sample lambda that updates the url in the table. It calls the function to update.
    import psycopg2
    def lambda_handler(event,context):
    conn_string = "dbname='test' port='5432' user='username' password='password' host='RDSEndpoint'"
    conn = psycopg2.connect(conn_string)
    cursor = conn.cursor()
    cursor.execute("select system_env_host('ramasankarmolleti.com')")
    conn.commit()
    cursor.close()
    print("working")
    
  3. You may also download python code here 
  4. Now, let’s create a lambda function
  5. Lambda_1
  6. In the above screenshot I’ve created a test_lambda_role iam role to get access to lambda services. Once you create the lambda, then combine both step1 and step2 as one zip file and upload the zip file in lambda you may download the combined zip file here
  7. Lambda_2
  8. Next, Here comes to access to Database from lambda. You need to create a security group that have access to PostgreSQL database from the lambda. I’ve created a security group and attached to lambda.
  9. Now, we have lambda deployed and run the lambda for testing. Click on test to test the lambda. This will prompt you to create test even as below
  10. Lambda_3.PNG
  11. After creating test even you can run the lambda by clicking test before that make sure lambda function name is updated to function name you created. In this example function name is UpdateHost_Python  as shown in below
  12. Lambda_4
  13. Now, Click on test to test the lambda.
  14. Lambda_5.PNG
  15.  Here you go lambda is successfully executed
  16. Lambda_6.PNG
  17. Let’s see the results in the database. You can see the results that host is updated to ramasankarmolleti.com.
  18. Update_Host_Python

Hope you enjoyed the post!

Cheers

Ramasankar Molleti

LinkedIn

Posted in AWS, PostgreSQL, Uncategorized | 5 Comments

Powershell Script to change all windows scheduled tasks credentials

One of my student asked me is there any simple script to update all windows scheduled tasks credentials in one step. Here is a simple way to do

$taskname = Get-ScheduledTask -TaskPath '\' | select TaskName
foreach ($task in $taskname)
{
Set-ScheduledTask -Password "password" -User "user" -TaskName $task.TaskName
}

Hope you enjoyed the post!

Cheers

Ramasankar Molleti

MSDN:LinkedIn:Twitter

Posted in Uncategorized | Leave a comment