Friday, March 10, 2017

Defining a Web-services Parallel processing Controller

The following presentation demonstrates a preferred solution I architected at a former Healthcare company.

The problem :   Too many webservices processing and connecting in an unorganized maner without any controls.   Multiple approaches  to loading efficiently caused the web-services API layer to become over-burdened.

The solution:   Build a webservices parallel load controller.

Please review video.

The last two screens defined costs using a real-time queue thru Data connect.

We did not do that.  Instead we used the SQL Server database to house a message controller, and then built processes in DataConnect executed by a stand-alone engine ( not Integration Manager).   Then windows task scheduler was used to run "X" number of loader jobs or queues.










XML Parsing IN SQL Server

COOL SQLServer code of the week
======================================


Hey Integration Fans....

Cool Code of the week.... so using Pervasive I was trying to parse an XML file like the Sample below. It had 125000 Request segments and was taking 45 minutes to process. I changed to delimited text source like I had done at FirstCare, and did my own parsing and that brought the time down by 10%.

The I tried the SQL code below to parse the XML inside of SQL SERVER... and that brought the parsing down to 125 seconds in 2 passes ( one for each request type). Note the savings was from 2700 seconds to 125, a reduction to 4.6% of the original time.

Enjoy this could be really helpful.

David


Sample XML showing schema....

<ETLWebServiceRequests>
<Request RequestType="CONSUMERTAG">
<assign-consumer-tag>
<tag-short-name>RpLexNex</tag-short-name>
<consumer-agency-identifier>1379376</consumer-agency-identifier>
</assign-consumer-tag>
</Request>
<Request RequestType="AREVENT">
<save-arevent-with-shortnames>
<consumer-agency-identifier>1601904</consumer-agency-identifier>
<action-code-shortname>CNSMSCR</action-code-shortname>
<result-code-shortname>SKPINFO</result-code-shortname>
<message-text>Received New score information from Skiptrace Lexis Nexis</message-text>
</save-arevent-with-shortnames>
</Request>
</ETLWebServiceRequests>


And here is the SQL:
----------------------------------

DECLARE @XML XML;
SELECT @XML = CAST((SELECT * FROM OPENROWSET (BULK '\\wasvpdb005\FileImport\LN_PostProcess\LN_Tag.xml' , SINGLE_BLOB) AS x) AS XML);
SELECT RequestNodes,
CAST( RequestNodes AS NVARCHAR(4000)) RequestNodeTxt,
(
SELECT T.c.value('.','varchar(8)')
FROM RequestNodes.nodes('/assign-consumer-tag/tag-short-name[1]') T(c)
) tag_short_name,
(
SELECT T.c.value('.','int')
FROM RequestNodes.nodes('/assign-consumer-tag/consumer-agency-identifier[1]') T(c)
) consumer_agency_identifier
FROM
(
SELECT T.c.query('.') AS RequestNodes
--, T.c.value('../@RequestType','varchar(50)') AS result
FROM @XML.nodes('/ETLWebServiceRequests/Request/*') T(c)
) RequestNodes

SELECT RequestNodes,
CAST( RequestNodes AS NVARCHAR(4000)) RequestNodeTxt,
(
SELECT T.c.value('.','varchar(8)')
FROM RequestNodes.nodes('/save-arevent-with-shortnames/action-code-shortname[1]') T(c)
) action_code_shortname,
(
SELECT T.c.value('.','varchar(8)')
FROM RequestNodes.nodes('/save-arevent-with-shortnames/message-text[1]') T(c)
) message_text,
(
SELECT T.c.value('.','varchar(8)')
FROM RequestNodes.nodes('/save-arevent-with-shortnames/action-code-shortname[1]') T(c)
) action_code_shortname,
(
SELECT T.c.value('.','int')
FROM RequestNodes.nodes('/save-arevent-with-shortnames/consumer-agency-identifier[1]') T(c)
) consumer_agency_identifier
FROM
(
SELECT T.c.query('.') AS RequestNodes
--, T.c.value('../@RequestType','varchar(50)') AS result
FROM @XML.nodes('/ETLWebServiceRequests/Request/*') T(c)
) RequestNodes
 collapse

Submitting a stored Proc Asynchronously


COOL CODE of the week :
    Careful how you use this.   It is powerful and allows you to start a stored proc(2) within a stored proc(1), and conitnue on in the calling proc(1) even while the called proc(2) is still running.   I am thinking you could build one master stored proc to run the Atlas Queues.     Note:   the very bottom of the calling proc(1)  should have a monitoring piece that watches for the status of the Queued procs to be completed ( and yes the called proc(2) would have to write to a table to say it was done.    When all this occurs, then the calling proc(1) can finish.        

http://www.databasejournal.com/features/mssql/article.php/3427581/Submitting-AStored-Procedure-Asynchronously.htm


Using Webservices in Other Databases

And the Oracle Cloud Equivalent : https://cloud.oracle.com/database

And MYSQL : http://open-bi.blogspot.com/2012/11/call-restful-web-services-from-mysql.html

USING SQL to work with WEBSERVICES in SQL Server

COOL SQLServer code of the week - Part 2
USING SQL to work with WEBSERVICES in SQL Server
======================================
Note: the webservice in this does not give a response back.  Not sure what is wrong with the SQL

Hey Integration Fans....


Below is some pretty powerful code that let’s you execute a call to a web-service from SQL-Server directly.

[dbo].[usp_HTTPRequest] - This stored proc makes the connection to the webservice. It should be enhanced probably to handle retry’s/Timeouts/XL Messages and such. 

The parameters on the execution of this proc allow you to put in the URL where the xml message is being sent, the actual xml message, and the other potentially required information such as methodName , SoapAction , UserName, and Password


David 

CODE:
======================================

USE [Work]
GO
/****** Object: StoredProcedure [dbo].[usp_HTTPRequest] Script Date: 8/14/2014 9:48:17 AM ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

ALTER proc [dbo].[usp_HTTPRequest] (
@URI varchar(2000) = '', 
@methodName varchar(50) = '',
@requestBody varchar(8000) = '',
@SoapAction varchar(255),
@UserName nvarchar(100), -- Domain\UserName or UserName
@Password nvarchar(100),
@responseText varchar(8000) output )
as
SET NOCOUNT ON
IF @methodName = ''
BEGIN
select FailPoint = 'Method Name must be set'
return
END
set @responseText = 'FAILED'
DECLARE @objectID int
DECLARE @hResult int
DECLARE @source varchar(255), @desc varchar(255)
EXEC @hResult = sp_OACreate 'MSXML2.ServerXMLHTTP', @objectID OUT
IF @hResult <> 0
BEGIN
EXEC sp_OAGetErrorInfo @objectID, @source OUT, @desc OUT
SELECT hResult = convert(varbinary(4), @hResult),
source = @source,
description = @desc,
FailPoint = 'Create failed',
MedthodName = @methodName
goto destroy
return
END
-- open the destination URI with Specified method
EXEC @hResult = sp_OAMethod @objectID, 'open', null, @methodName, @URI, 'false', @UserName, @Password
IF @hResult <> 0
BEGIN
EXEC sp_OAGetErrorInfo @objectID, @source OUT, @desc OUT
SELECT hResult = convert(varbinary(4), @hResult),
source = @source,
description = @desc,
FailPoint = 'Open failed',
MedthodName = @methodName
goto destroy
return
END
-- set request headers
EXEC @hResult = sp_OAMethod @objectID, 'setRequestHeader', null, 'Content-Type', 'text/xml;charset=UTF-8'
IF @hResult <> 0
BEGIN
EXEC sp_OAGetErrorInfo @objectID, @source OUT, @desc OUT
SELECT hResult = convert(varbinary(4), @hResult),
source = @source,
description = @desc,
FailPoint = 'SetRequestHeader failed',
MedthodName = @methodName
goto destroy
return
END
-- set soap action
EXEC @hResult = sp_OAMethod @objectID, 'setRequestHeader', null, 'SOAPAction', @SoapAction
IF @hResult <> 0
BEGIN
EXEC sp_OAGetErrorInfo @objectID, @source OUT, @desc OUT
SELECT hResult = convert(varbinary(4), @hResult),
source = @source,
description = @desc,
FailPoint = 'SetRequestHeader failed',
MedthodName = @methodName
goto destroy
return
END
declare @len int
set @len = len(@requestBody)
EXEC @hResult = sp_OAMethod @objectID, 'setRequestHeader', null, 'Content-Length', @len
IF @hResult <> 0
BEGIN
EXEC sp_OAGetErrorInfo @objectID, @source OUT, @desc OUT
SELECT hResult = convert(varbinary(4), @hResult),
source = @source,
description = @desc,
FailPoint = 'SetRequestHeader failed',
MedthodName = @methodName
goto destroy
return
END
/*
-- if you have headers in a table called RequestHeader you can go through them with this
DECLARE @HeaderKey varchar(500), @HeaderValue varchar(500)
DECLARE RequestHeader CURSOR
LOCAL FAST_FORWARD
FOR
SELECT HeaderKey, HeaderValue
FROM RequestHeaders
WHERE Method = @methodName
OPEN RequestHeader
FETCH NEXT FROM RequestHeader
INTO @HeaderKey, @HeaderValue
WHILE @@FETCH_STATUS = 0
BEGIN
--select @HeaderKey, @HeaderValue, @methodName
EXEC @hResult = sp_OAMethod @objectID, 'setRequestHeader', null, @HeaderKey, @HeaderValue
IF @hResult <> 0
BEGIN
EXEC sp_OAGetErrorInfo @objectID, @source OUT, @desc OUT
SELECT hResult = convert(varbinary(4), @hResult),
source = @source,
description = @desc,
FailPoint = 'SetRequestHeader failed',
MedthodName = @methodName
goto destroy
return
END
FETCH NEXT FROM RequestHeader
INTO @HeaderKey, @HeaderValue
END
CLOSE RequestHeader
DEALLOCATE RequestHeader
*/
-- send the request
EXEC @hResult = sp_OAMethod @objectID, 'send', null, @requestBody
IF @hResult <> 0
BEGIN
EXEC sp_OAGetErrorInfo @objectID, @source OUT, @desc OUT
SELECT hResult = convert(varbinary(4), @hResult),
source = @source,
description = @desc,
FailPoint = 'Send failed',
MedthodName = @methodName
goto destroy
return
END
declare @statusText varchar(1000), @status varchar(1000)
-- Get status text
exec sp_OAGetProperty @objectID, 'StatusText', @statusText out
exec sp_OAGetProperty @objectID, 'Status', @status out
select @status, @statusText, @methodName
-- Get response text
exec sp_OAGetProperty @objectID, 'responseText', @responseText out
IF @hResult <> 0
BEGIN
EXEC sp_OAGetErrorInfo @objectID, @source OUT, @desc OUT
SELECT hResult = convert(varbinary(4), @hResult),
source = @source,
description = @desc,
FailPoint = 'ResponseText failed',
MedthodName = @methodName
goto destroy
return
END
destroy:
exec sp_OADestroy @objectID
SET NOCOUNT OFF

Parallel Messages Queues in SQL Server


The following code allows you to build XML messages and dropthem in a set of QUEUES and have SQL Server push them as quickly as they can go through in Parallel Queues

The following Insert statement loads a table with XML messages to be processed by a webservice.
It contains the URL for the message to be sent to.
It contains the MsgReq that is the message to be sent.
It contains the Status, priority, and some Application IDs to enable keeping track of the processed messages.
It contains a QUEUE.  This is assigned by Modding in the example below the ID by the value of 10.  This assigns a integer QUEUE value between 0 and 9.  This value of 10 can be increased to a max of maybe 200 queues.  This really depends on the server.

insert into [dbo].[WS_Messages_Queued]
       ( Queue , Priority , Get_Put_Ind , Url , MsgRsp , EnvGrp , Status , External_FICO_ID , External_FICO_ID_Description, MsgReq)
       select    cast(acct_id as int) % 10      as     Queue                       
              , 1                               as     Priority                    
              , 'P'                             as     Get_Put_Ind                 
              , 'http://' + @SRVR +':8080/CRSTitaniumWebServices/wsservices/tagAssociationService'       as     Url 
              , 'Not Processed Yet'             as     MsgRsp                      
              , @SRVR                           as     EnvGrp                      
              , 2                               as     Status                      
              , acct_id                         as     External_FICO_ID            
                           , 'CONSUMERACCOUNTTAG-agency-consumer-account-id---' + @in_fname                   as     External_FICO_ID_Description
                           ,  cast ('<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:v1="http://www.crsoftwareinc.com/xml/ns/titanium/tag/tagAssociationService/v1_0">'
              + '    <soapenv:Header/>'
              + '    <soapenv:Body>'
              + '     <assign-consumer-account-tag>'
              + '       <tag-short-name>'
              +           case when rectype = 'CPEN' then 'PENDRCL'
                               when rectype = 'CFIN' then 'REQCLTRQ'
                               Else 'Error' end 
              + '</tag-short-name>'
              + '       <agency-consumer-account-id>' + cast([acct_id] as varchar) + '</agency-consumer-account-id>'
              + '     </assign-consumer-account-tag>'
              + '     </soapenv:Body>'
              + ' </soapenv:Envelope>' as text )       as     MsgReq
                           from #temp4
                           where rectype in ( 'CPEN' , 'CFIN'  )


This is the description of the Queued Message Table.
CREATE TABLE [dbo].[WS_Messages_Queued](
       [id] [int] IDENTITY(1,1) NOT NULL,
       [Queue] [int] NULL,
       [Priority] [int] NULL,
       [Get_Put_Ind] [char](1) NULL,
       [Url] [varchar](1024) NOT NULL,
       [MsgReq] [text] NOT NULL,
       [MsgRsp] [text] NULL,
       [EnvGrp] [varchar](32) NOT NULL,
       [SubInd] [int] NULL,
       [Status] [int] NOT NULL,
       [LoadedOn] [datetime] NOT NULL,
       [StartedOn] [datetime] NULL,
       [CompletedOn] [datetime] NULL,
       [External_ID] [varchar](500) NULL,
       [External_FICO_ID] [varchar](500) NULL,
       [External_FICO_ID_Description] [varchar](500) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

GO

SET ANSI_PADDING OFF
GO

ALTER TABLE [dbo].[WS_Messages_Queued] ADD  CONSTRAINT [DF_WS_Messages_Queued_LoadedOn]  DEFAULT (getdate()) FOR [LoadedOn]
GO





This is the description of the Processed Message Table.


CREATE TABLE [dbo].[WS_Messages_Processed](
       [id] [int] NOT NULL,
       [Queue] [int] NULL,
       [Priority] [int] NULL,
       [Get_Put_Ind] [char](1) NULL,
       [Url] [varchar](1024) NOT NULL,
       [MsgReq] [text] NOT NULL,
       [MsgRsp] [text] NULL,
       [EnvGrp] [varchar](32) NOT NULL,
       [SubInd] [int] NOT NULL,
       [Status] [int] NOT NULL,
       [LoadedOn] [datetime] NOT NULL,
       [StartedOn] [datetime] NOT NULL,
       [CompletedOn] [datetime] NULL,
       [External_ID] [varchar](500) NULL,
       [External_FICO_ID] [varchar](500) NULL,
       [External_FICO_ID_Description] [varchar](500) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

This is the Stored proc that loads the message into the WEB-Service.  It actually calls on  Work.[dbo].[usp_HTTPRequest] in "COOL SQLServer code of the week - Part 2 -- USING SQL to work with WEBSERVICES in SQL Server"

On Yammer see : https://www.yammer.com/sfcg.com/#/inbox/show?threadId=456443782
On Blogger see: 
ALTER PROC [dbo].[sp_WS_Queued_MultiMessageSubmitter]
 (
      @in_Queue int,     
      @in_EnvGrp varchar(32)
 )
AS
  BEGIN
  SET nocount  ON
    DECLARE  @imax INT,
             @i    INT
    DECLARE  @id                            VARCHAR(100),
                      @Queue                         VARCHAR(400),
                      @Priority                      VARCHAR(400),
                      @Get_Put_Ind                   VARCHAR(400),
                      @Url                           VARCHAR(400),
                      @MsgReq                        VARCHAR(max),
                      @MsgRsp                        VARCHAR(max),
                      @EnvGrp                        VARCHAR(400),
                      @SubInd                        VARCHAR(400),
                      @Status                        VARCHAR(400),
                      @LoadedOn                      datetime,
                      @StartedOn                     datetime,
                      @CompletedOn                   datetime,
                      @External_ID                   VARCHAR(400),
                      @External_FICO_ID              VARCHAR(400),
                      @External_FICO_ID_Description  VARCHAR(400),
                      @RowID              int
       declare @xmlOut varchar(MAX)
       Declare @RequestText as varchar(MAX)

--update [dbo].[WS_Messages_Queued] set queue = 1


IF OBJECT_ID('tempdb..#QMsgList') IS NOT NULL drop table #QMsgList
SELECT TOP 250 IDENTITY(INT,1,1) AS RowID, cast(id as int) as ID ,Queue ,Priority ,Get_Put_Ind ,Url ,MsgReq ,MsgRsp ,EnvGrp ,SubInd ,Status ,LoadedOn ,StartedOn ,CompletedOn ,External_ID ,External_FICO_ID ,External_FICO_ID_Description  INTO #QMsgList FROM [dbo].[WS_Messages_Queued] NOLOCK where [Queue] = @in_Queue and EnvGrp = @in_EnvGrp and Status = 2 order by priority , id
    
    SET @imax = @@ROWCOUNT
    SET @i = 1
    
    WHILE (@i <= @imax)
      BEGIN

        SELECT @id                           = id                           ,
                     @Queue                        = Queue                        ,
               @Priority                     = Priority                     ,
               @Get_Put_Ind                  = Get_Put_Ind                  ,
               @Url                          = Url                          ,
               @MsgReq                       = MsgReq                       ,
               @MsgRsp                       = MsgRsp                       ,
               @EnvGrp                       = EnvGrp                       ,
               @SubInd                       = ''                           ,
               @Status                       = 4                            ,
               @LoadedOn                     = LoadedOn                     ,
               @StartedOn                    = getdate()                    ,
               @CompletedOn                  = CompletedOn                  ,
               @External_ID                  = External_ID                  ,
               @External_FICO_ID             = External_FICO_ID             ,
               @External_FICO_ID_Description = External_FICO_ID_Description
        FROM   #QMsgList
        WHERE  RowID = @i

        set     @StartedOn                    = getdate()        
        ------------------------------------------------------

        -- INSERT PROCESSING HERE

        ------------------------------------------------------

                                  set @RequestText= @MsgReq

                                  exec Work.[dbo].[usp_HTTPRequest]
                                   @Url,
                                  'POST',
                                   @RequestText,
                                  'SaveAREventWithShortNames',
                                  '', '', @xmlOut out

                                  --select @xmlOut 
          
        insert into [dbo].[WS_Messages_Processed]
        (
                        [id]
                       ,[Queue]
                       ,[Priority]
                       ,[Get_Put_Ind]
                       ,[Url]
                       ,[MsgReq]
                       ,[MsgRsp]
                       ,[EnvGrp]
                       ,[SubInd]
                       ,[Status]
                       ,[LoadedOn]
                       ,[StartedOn]
                       ,[CompletedOn]
                       ,[External_ID]
                       ,[External_FICO_ID]
                       ,[External_FICO_ID_Description]
              )
              values
        (
                        @id
                       ,@Queue
                       ,@Priority
                       ,@Get_Put_Ind
                       ,@Url
                       ,@MsgReq
                       ,@xmlOut
                       ,@EnvGrp
                       ,@SubInd
                       ,8
                       ,@LoadedOn
                       ,@StartedOn
                       ,getdate()
                       ,@External_ID
                       ,@External_FICO_ID
                       ,@External_FICO_ID_Description
              )


        Delete from  [dbo].[WS_Messages_Queued]
              where  [id] = @id


 --       PRINT CONVERT(varchar,@i)+' Message Processed : ' + @MsgReq + ' RESPONSE:  ' + @xmlOut
        
        SET @i = @i + 1
      END -- WHILE
  END -- SPROC




Next you  define a job for each QUEUE.



So starting a New Job like this:




Fill in Like this for each tab

Tab 1 - General -  the last numeric make the Queue number you are defining

Tab 2 - Steps

Tab 3 Alerts

Tab 4 - Notifications
 Tab 5 - Schedules



Here is some sample SQL Code to build your own JOB.

BEGIN TRANSACTION
DECLARE @ReturnCode INT
SELECT @ReturnCode = 0
/****** Object:  JobCategory [[Uncategorized (Local)]]]    Script Date: 10/30/2014 3:25:09 PM ******/
IF NOT EXISTS (SELECT name FROM msdb.dbo.syscategories WHERE name=N'[Uncategorized (Local)]' AND category_class=1)
BEGIN
EXEC @ReturnCode = msdb.dbo.sp_add_category @class=N'JOB', @type=N'LOCAL', @name=N'[Uncategorized (Local)]'
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback

END

DECLARE @jobId BINARY(16)
EXEC @ReturnCode =  msdb.dbo.sp_add_job @job_name=N'WS_DM9_LOADER_Q0',
              @enabled=1,
              @notify_level_eventlog=0,
              @notify_level_email=2,
              @notify_level_netsend=0,
              @notify_level_page=0,
              @delete_level=0,
              @description=N'Webservice Queue 0.',
              @category_name=N'[Uncategorized (Local)]',
              @owner_login_name=N'WPI\dav_byr',
              @notify_email_operator_name=N'WPI_SQL_ALERT', @job_id = @jobId OUTPUT
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
/****** Object:  Step [Submit Qx]    Script Date: 10/30/2014 3:25:09 PM ******/
EXEC @ReturnCode = msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Submit Qx',
              @step_id=1,
              @cmdexec_success_code=0,
              @on_success_action=1,
              @on_success_step_id=0,
              @on_fail_action=2,
              @on_fail_step_id=0,
              @retry_attempts=0,
              @retry_interval=0,
              @os_run_priority=0, @subsystem=N'TSQL',
              @command=N'exec [dbo].[sp_WS_Queued_MultiMessageSubmitter]  0, wasvpap003',
              @database_name=N'WPI_WebServices',
              @flags=0
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
EXEC @ReturnCode = msdb.dbo.sp_update_job @job_id = @jobId, @start_step_id = 1
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
EXEC @ReturnCode = msdb.dbo.sp_add_jobschedule @job_id=@jobId, @name=N'WS_DM_Loader_Schedule',
              @enabled=1,
              @freq_type=4,
              @freq_interval=1,
              @freq_subday_type=4,
              @freq_subday_interval=1,
              @freq_relative_interval=0,
              @freq_recurrence_factor=0,
              @active_start_date=20140826,
              @active_end_date=99991231,
              @active_start_time=0,
              @active_end_time=235959,
              @schedule_uid=N'ffde7f7d-b038-4ec2-87fe-9b68ae0223d0'
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
EXEC @ReturnCode = msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
COMMIT TRANSACTION
GOTO EndSave
QuitWithRollback:
    IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
EndSave:

GO