Friday, March 10, 2017

Parallel Messages Queues in SQL Server


The following code allows you to build XML messages and dropthem in a set of QUEUES and have SQL Server push them as quickly as they can go through in Parallel Queues

The following Insert statement loads a table with XML messages to be processed by a webservice.
It contains the URL for the message to be sent to.
It contains the MsgReq that is the message to be sent.
It contains the Status, priority, and some Application IDs to enable keeping track of the processed messages.
It contains a QUEUE.  This is assigned by Modding in the example below the ID by the value of 10.  This assigns a integer QUEUE value between 0 and 9.  This value of 10 can be increased to a max of maybe 200 queues.  This really depends on the server.

insert into [dbo].[WS_Messages_Queued]
       ( Queue , Priority , Get_Put_Ind , Url , MsgRsp , EnvGrp , Status , External_FICO_ID , External_FICO_ID_Description, MsgReq)
       select    cast(acct_id as int) % 10      as     Queue                       
              , 1                               as     Priority                    
              , 'P'                             as     Get_Put_Ind                 
              , 'http://' + @SRVR +':8080/CRSTitaniumWebServices/wsservices/tagAssociationService'       as     Url 
              , 'Not Processed Yet'             as     MsgRsp                      
              , @SRVR                           as     EnvGrp                      
              , 2                               as     Status                      
              , acct_id                         as     External_FICO_ID            
                           , 'CONSUMERACCOUNTTAG-agency-consumer-account-id---' + @in_fname                   as     External_FICO_ID_Description
                           ,  cast ('<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:v1="http://www.crsoftwareinc.com/xml/ns/titanium/tag/tagAssociationService/v1_0">'
              + '    <soapenv:Header/>'
              + '    <soapenv:Body>'
              + '     <assign-consumer-account-tag>'
              + '       <tag-short-name>'
              +           case when rectype = 'CPEN' then 'PENDRCL'
                               when rectype = 'CFIN' then 'REQCLTRQ'
                               Else 'Error' end 
              + '</tag-short-name>'
              + '       <agency-consumer-account-id>' + cast([acct_id] as varchar) + '</agency-consumer-account-id>'
              + '     </assign-consumer-account-tag>'
              + '     </soapenv:Body>'
              + ' </soapenv:Envelope>' as text )       as     MsgReq
                           from #temp4
                           where rectype in ( 'CPEN' , 'CFIN'  )


This is the description of the Queued Message Table.
CREATE TABLE [dbo].[WS_Messages_Queued](
       [id] [int] IDENTITY(1,1) NOT NULL,
       [Queue] [int] NULL,
       [Priority] [int] NULL,
       [Get_Put_Ind] [char](1) NULL,
       [Url] [varchar](1024) NOT NULL,
       [MsgReq] [text] NOT NULL,
       [MsgRsp] [text] NULL,
       [EnvGrp] [varchar](32) NOT NULL,
       [SubInd] [int] NULL,
       [Status] [int] NOT NULL,
       [LoadedOn] [datetime] NOT NULL,
       [StartedOn] [datetime] NULL,
       [CompletedOn] [datetime] NULL,
       [External_ID] [varchar](500) NULL,
       [External_FICO_ID] [varchar](500) NULL,
       [External_FICO_ID_Description] [varchar](500) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

GO

SET ANSI_PADDING OFF
GO

ALTER TABLE [dbo].[WS_Messages_Queued] ADD  CONSTRAINT [DF_WS_Messages_Queued_LoadedOn]  DEFAULT (getdate()) FOR [LoadedOn]
GO





This is the description of the Processed Message Table.


CREATE TABLE [dbo].[WS_Messages_Processed](
       [id] [int] NOT NULL,
       [Queue] [int] NULL,
       [Priority] [int] NULL,
       [Get_Put_Ind] [char](1) NULL,
       [Url] [varchar](1024) NOT NULL,
       [MsgReq] [text] NOT NULL,
       [MsgRsp] [text] NULL,
       [EnvGrp] [varchar](32) NOT NULL,
       [SubInd] [int] NOT NULL,
       [Status] [int] NOT NULL,
       [LoadedOn] [datetime] NOT NULL,
       [StartedOn] [datetime] NOT NULL,
       [CompletedOn] [datetime] NULL,
       [External_ID] [varchar](500) NULL,
       [External_FICO_ID] [varchar](500) NULL,
       [External_FICO_ID_Description] [varchar](500) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

This is the Stored proc that loads the message into the WEB-Service.  It actually calls on  Work.[dbo].[usp_HTTPRequest] in "COOL SQLServer code of the week - Part 2 -- USING SQL to work with WEBSERVICES in SQL Server"

On Yammer see : https://www.yammer.com/sfcg.com/#/inbox/show?threadId=456443782
On Blogger see: 
ALTER PROC [dbo].[sp_WS_Queued_MultiMessageSubmitter]
 (
      @in_Queue int,     
      @in_EnvGrp varchar(32)
 )
AS
  BEGIN
  SET nocount  ON
    DECLARE  @imax INT,
             @i    INT
    DECLARE  @id                            VARCHAR(100),
                      @Queue                         VARCHAR(400),
                      @Priority                      VARCHAR(400),
                      @Get_Put_Ind                   VARCHAR(400),
                      @Url                           VARCHAR(400),
                      @MsgReq                        VARCHAR(max),
                      @MsgRsp                        VARCHAR(max),
                      @EnvGrp                        VARCHAR(400),
                      @SubInd                        VARCHAR(400),
                      @Status                        VARCHAR(400),
                      @LoadedOn                      datetime,
                      @StartedOn                     datetime,
                      @CompletedOn                   datetime,
                      @External_ID                   VARCHAR(400),
                      @External_FICO_ID              VARCHAR(400),
                      @External_FICO_ID_Description  VARCHAR(400),
                      @RowID              int
       declare @xmlOut varchar(MAX)
       Declare @RequestText as varchar(MAX)

--update [dbo].[WS_Messages_Queued] set queue = 1


IF OBJECT_ID('tempdb..#QMsgList') IS NOT NULL drop table #QMsgList
SELECT TOP 250 IDENTITY(INT,1,1) AS RowID, cast(id as int) as ID ,Queue ,Priority ,Get_Put_Ind ,Url ,MsgReq ,MsgRsp ,EnvGrp ,SubInd ,Status ,LoadedOn ,StartedOn ,CompletedOn ,External_ID ,External_FICO_ID ,External_FICO_ID_Description  INTO #QMsgList FROM [dbo].[WS_Messages_Queued] NOLOCK where [Queue] = @in_Queue and EnvGrp = @in_EnvGrp and Status = 2 order by priority , id
    
    SET @imax = @@ROWCOUNT
    SET @i = 1
    
    WHILE (@i <= @imax)
      BEGIN

        SELECT @id                           = id                           ,
                     @Queue                        = Queue                        ,
               @Priority                     = Priority                     ,
               @Get_Put_Ind                  = Get_Put_Ind                  ,
               @Url                          = Url                          ,
               @MsgReq                       = MsgReq                       ,
               @MsgRsp                       = MsgRsp                       ,
               @EnvGrp                       = EnvGrp                       ,
               @SubInd                       = ''                           ,
               @Status                       = 4                            ,
               @LoadedOn                     = LoadedOn                     ,
               @StartedOn                    = getdate()                    ,
               @CompletedOn                  = CompletedOn                  ,
               @External_ID                  = External_ID                  ,
               @External_FICO_ID             = External_FICO_ID             ,
               @External_FICO_ID_Description = External_FICO_ID_Description
        FROM   #QMsgList
        WHERE  RowID = @i

        set     @StartedOn                    = getdate()        
        ------------------------------------------------------

        -- INSERT PROCESSING HERE

        ------------------------------------------------------

                                  set @RequestText= @MsgReq

                                  exec Work.[dbo].[usp_HTTPRequest]
                                   @Url,
                                  'POST',
                                   @RequestText,
                                  'SaveAREventWithShortNames',
                                  '', '', @xmlOut out

                                  --select @xmlOut 
          
        insert into [dbo].[WS_Messages_Processed]
        (
                        [id]
                       ,[Queue]
                       ,[Priority]
                       ,[Get_Put_Ind]
                       ,[Url]
                       ,[MsgReq]
                       ,[MsgRsp]
                       ,[EnvGrp]
                       ,[SubInd]
                       ,[Status]
                       ,[LoadedOn]
                       ,[StartedOn]
                       ,[CompletedOn]
                       ,[External_ID]
                       ,[External_FICO_ID]
                       ,[External_FICO_ID_Description]
              )
              values
        (
                        @id
                       ,@Queue
                       ,@Priority
                       ,@Get_Put_Ind
                       ,@Url
                       ,@MsgReq
                       ,@xmlOut
                       ,@EnvGrp
                       ,@SubInd
                       ,8
                       ,@LoadedOn
                       ,@StartedOn
                       ,getdate()
                       ,@External_ID
                       ,@External_FICO_ID
                       ,@External_FICO_ID_Description
              )


        Delete from  [dbo].[WS_Messages_Queued]
              where  [id] = @id


 --       PRINT CONVERT(varchar,@i)+' Message Processed : ' + @MsgReq + ' RESPONSE:  ' + @xmlOut
        
        SET @i = @i + 1
      END -- WHILE
  END -- SPROC




Next you  define a job for each QUEUE.



So starting a New Job like this:




Fill in Like this for each tab

Tab 1 - General -  the last numeric make the Queue number you are defining

Tab 2 - Steps

Tab 3 Alerts

Tab 4 - Notifications
 Tab 5 - Schedules



Here is some sample SQL Code to build your own JOB.

BEGIN TRANSACTION
DECLARE @ReturnCode INT
SELECT @ReturnCode = 0
/****** Object:  JobCategory [[Uncategorized (Local)]]]    Script Date: 10/30/2014 3:25:09 PM ******/
IF NOT EXISTS (SELECT name FROM msdb.dbo.syscategories WHERE name=N'[Uncategorized (Local)]' AND category_class=1)
BEGIN
EXEC @ReturnCode = msdb.dbo.sp_add_category @class=N'JOB', @type=N'LOCAL', @name=N'[Uncategorized (Local)]'
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback

END

DECLARE @jobId BINARY(16)
EXEC @ReturnCode =  msdb.dbo.sp_add_job @job_name=N'WS_DM9_LOADER_Q0',
              @enabled=1,
              @notify_level_eventlog=0,
              @notify_level_email=2,
              @notify_level_netsend=0,
              @notify_level_page=0,
              @delete_level=0,
              @description=N'Webservice Queue 0.',
              @category_name=N'[Uncategorized (Local)]',
              @owner_login_name=N'WPI\dav_byr',
              @notify_email_operator_name=N'WPI_SQL_ALERT', @job_id = @jobId OUTPUT
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
/****** Object:  Step [Submit Qx]    Script Date: 10/30/2014 3:25:09 PM ******/
EXEC @ReturnCode = msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Submit Qx',
              @step_id=1,
              @cmdexec_success_code=0,
              @on_success_action=1,
              @on_success_step_id=0,
              @on_fail_action=2,
              @on_fail_step_id=0,
              @retry_attempts=0,
              @retry_interval=0,
              @os_run_priority=0, @subsystem=N'TSQL',
              @command=N'exec [dbo].[sp_WS_Queued_MultiMessageSubmitter]  0, wasvpap003',
              @database_name=N'WPI_WebServices',
              @flags=0
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
EXEC @ReturnCode = msdb.dbo.sp_update_job @job_id = @jobId, @start_step_id = 1
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
EXEC @ReturnCode = msdb.dbo.sp_add_jobschedule @job_id=@jobId, @name=N'WS_DM_Loader_Schedule',
              @enabled=1,
              @freq_type=4,
              @freq_interval=1,
              @freq_subday_type=4,
              @freq_subday_interval=1,
              @freq_relative_interval=0,
              @freq_recurrence_factor=0,
              @active_start_date=20140826,
              @active_end_date=99991231,
              @active_start_time=0,
              @active_end_time=235959,
              @schedule_uid=N'ffde7f7d-b038-4ec2-87fe-9b68ae0223d0'
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
EXEC @ReturnCode = msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
COMMIT TRANSACTION
GOTO EndSave
QuitWithRollback:
    IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
EndSave:

GO


No comments:

Post a Comment