Using Information_Schema to get Identity/PK/NonPK/All columns, FK dependencies and concatenate columns  

Posted by ReelTym


----------------------------------------------------------
-- Grab list of tables
----------------------------------------------------------
if object_id( 'tempdb..#tmpTables' ) is not null drop table #tmpTables
create table #tmpTables
(
ordinal int identity(1,1),
table_catalog sysname,
table_schema sysname,
table_name sysname,
Sequence int default(1),
SequenceNbr int NULL,
HasIdentityCol bit default(0)
)
insert into #tmpTables ( table_catalog, table_schema, table_name )
select
t.table_catalog,
t.table_schema,
t.table_name
from information_schema.tables t (nolock)

----------------------------------------------------------
-- Load temp tables with table/column/constraint data from information_schema views
----------------------------------------------------------
if object_id( 'tempdb..#tmpColumns' ) is not null drop table #tmpColumns
select i.* into #tmpColumns from information_schema.columns i join #tmpTables t on i.table_catalog = t.table_catalog and i.table_schema = t.table_schema and i.table_name = t.table_name
create clustered index ix_tmpColumns_csn on #tmpColumns ( table_catalog, table_schema, table_name, column_name )

if object_id( 'tempdb..#tmpTblConstraints' ) is not null drop table #tmpTblConstraints
select i.* into #tmpTblConstraints from information_schema.table_constraints i join #tmpTables t on i.table_catalog = t.table_catalog and i.table_schema = t.table_schema and i.table_name = t.table_name
create clustered index ix_tmpTblConstraints_csn on #tmpTblConstraints ( table_catalog, table_schema, table_name )
create index ix_tmpTblConstraints_c on #tmpTblConstraints ( constraint_catalog, constraint_schema, constraint_name )

if object_id( 'tempdb..#tmpKeyColUsage' ) is not null drop table #tmpKeyColUsage
select i.* into #tmpKeyColUsage from information_schema.key_column_usage i join #tmpTables t on i.table_catalog = t.table_catalog and i.table_schema = t.table_schema and i.table_name = t.table_name
create clustered index ix_tmpKeyColUsage_csn on #tmpKeyColUsage ( table_catalog, table_schema, table_name, column_name )

if object_id( 'tempdb..#tmpRefConstraints' ) is not null drop table #tmpRefConstraints
select i.* into #tmpRefConstraints
from information_schema.referential_constraints i
join #tmpTblConstraints c
on ( i.constraint_catalog = c.constraint_catalog
and i.constraint_schema = c.constraint_schema
and i.constraint_name = c.constraint_name )
or ( i.unique_constraint_catalog = c.constraint_catalog
and i.unique_constraint_schema = c.constraint_schema
and i.unique_constraint_name = c.constraint_name )
create clustered index ix_tmpRefConstraints_c on #tmpRefConstraints ( unique_constraint_catalog, unique_constraint_schema, unique_constraint_name, constraint_catalog, constraint_schema, constraint_name )

----------------------------------------------------------
-- Check for the existance of LastUpdatedBy, UpdateDate and Identity columns
----------------------------------------------------------
update #tmpTables
set
HasIdentityCol = case when i.column_name is not null then 1 else 0 end
from #tmpTables t (nolock)
left join #tmpColumns i (nolock)
on t.table_catalog = i.table_catalog
and t.table_schema = i.table_schema
and t.table_name = i.table_name
and columnproperty( object_id( i.table_name ), i.column_name, 'IsIdentity' ) = 1

----------------------------------------------------------
-- Update load sequence based on dependencies
----------------------------------------------------------
while( @@rowcount > 0 )
update #tmpTables
set Sequence = s.Sequence + 1
from #tmpTables fkt
join (
select
table_catalog = fk.table_catalog,
table_schema = fk.table_schema,
table_name = fk.table_name,
Sequence = max( pkt.Sequence )
from #tmpTblConstraints fk (nolock)
join #tmpRefConstraints r (nolock)
on r.constraint_catalog = fk.constraint_catalog
and r.constraint_schema = fk.constraint_schema
and r.constraint_name = fk.constraint_name
join #tmpTblConstraints pk (nolock)
on r.unique_constraint_catalog = pk.constraint_catalog
and r.unique_constraint_schema = pk.constraint_schema
and r.unique_constraint_name = pk.constraint_name
and pk.constraint_type = 'primary key'
join #tmpTables pkt
on pk.table_catalog = pkt.table_catalog
and pk.table_schema = pkt.table_schema
and pk.table_name = pkt.table_name
where fk.constraint_type = 'foreign key'
and not
( fk.table_catalog = pk.table_catalog
and fk.table_schema = pk.table_schema
and fk.table_name = pk.table_name
)
group by fk.table_catalog, fk.table_schema, fk.table_name
) s
on fkt.table_catalog = s.table_catalog
and fkt.table_schema = s.table_schema
and fkt.table_name = s.table_name
and fkt.Sequence = s.Sequence

update #tmpTables
set SequenceNbr = s.SequenceNbr
from #tmpTables t
join (
select
table_catalog,
table_schema,
table_name,
SequenceNbr = rank() over( order by Sequence, table_catalog, table_schema, table_name )
from #tmpTables
) s on t.table_name = s.table_name

----------------------------------------------------------
-- Load temp tables with concattenated PK column data
----------------------------------------------------------
if object_id( 'tempdb..#tmpPKCols' ) is not null drop table #tmpPKCols
;with
pkcols
as
(
select
c.table_catalog, c.table_schema, c.table_name, c.column_name,
ordinal = rank() over ( partition by c.table_name order by c.table_name, c.ordinal_position ),
reverse_ordinal = rank() over ( partition by c.table_name order by c.table_name, c.ordinal_position desc )
from #tmpColumns c (nolock)
join #tmpKeyColUsage kc (nolock)
on c.table_catalog = kc.table_catalog
and c.table_schema = kc.table_schema
and c.table_name = kc.table_name
and c.column_name = kc.column_name
join #tmpTblConstraints tc (nolock)
on kc.table_catalog = tc.table_catalog
and kc.table_schema = tc.table_schema
and kc.table_name = tc.table_name
and kc.constraint_name = tc.constraint_name
join #tmpTables t (nolock)
on tc.table_catalog = t.table_catalog
and tc.table_schema = t.table_schema
and tc.table_name = t.table_name
where tc.constraint_type = 'primary key'
)
select * into #tmpPKCols from pkcols order by ordinal
if object_id( 'tempdb..#tmpConcatPKCols' ) is not null drop table #tmpConcatPKCols
;with concatpkcols
as
(
select
c.table_catalog, c.table_schema, c.table_name, c.column_name, c.ordinal, c.reverse_ordinal,
JoinOnPKCols = convert( varchar(max), '{source_alias_1}' + c.column_name + ' = {source_alias_2}' + c.column_name ),
ConcatPKCols = convert( varchar(max), 'coalesce( convert( varchar, ' + c.column_name + ' ), '''''''' )' ),
AllPKCols = convert( varchar(max), '{source_alias}' + c.column_name )
from #tmpPKCols c
where c.ordinal = 1
union all
select
c.table_catalog, c.table_schema, c.table_name, c.column_name, c.ordinal, c.reverse_ordinal,
JoinOnPKCols = convert( varchar(max), cpc.JoinOnPKCols + '{wrap}{source_alias_1}' + c.column_name + ' = {source_alias_2}' + c.column_name ),
ConcatPKCols = convert( varchar(max), cpc.ConcatPKCols + '{wrap}+ char(1) + coalesce( convert( varchar, ' + c.column_name + ' ), '''''''' )' ),
AllPKCols = convert( varchar(max), cpc.AllPKCols + '{wrap}{source_alias}' + c.column_name )
from #tmpPKCols c
join concatpkcols cpc
on c.table_catalog = cpc.table_catalog
and c.table_schema = cpc.table_schema
and c.table_name = cpc.table_name
and c.ordinal = cpc.ordinal + 1
)
select * into #tmpConcatPKCols from concatpkcols where reverse_ordinal = 1

----------------------------------------------------------
-- Load temp tables with concattenated Non-PK column data
----------------------------------------------------------
if object_id( 'tempdb..#tmpNonPKCols' ) is not null drop table #tmpNonPKCols
;with nonpkcols
as
(
select
c.table_catalog, c.table_schema, c.table_name, c.column_name,
ordinal = rank() over ( partition by c.table_name order by c.table_name, c.ordinal_position ),
reverse_ordinal = rank() over ( partition by c.table_name order by c.table_name, c.ordinal_position desc )
from #tmpColumns c (nolock)
join #tmpTables t (nolock)
on c.table_catalog = t.table_catalog
and c.table_schema = t.table_schema
and c.table_name = t.table_name
left join #tmpPKCols pk
on c.table_catalog = pk.table_catalog
and c.table_schema = pk.table_schema
and c.table_name = pk.table_name
and c.column_name = pk.column_name
where pk.column_name is null and c.data_type not in ( 'text', 'image' )
)
select * into #tmpNonPKCols from nonpkcols order by ordinal
if object_id( 'tempdb..#tmpConcatNonPKCols' ) is not null drop table #tmpConcatNonPKCols
;with concatnonpkcols
as
(
select
c.table_catalog, c.table_schema, c.table_name, c.column_name, c.ordinal, c.reverse_ordinal,
DiffOnNonPKCols = convert( varchar(max), 'where nullif( {source_alias_1}' + c.column_name + ', {source_alias_2}' + c.column_name + ' ) is not null or nullif( {source_alias_2}' + c.column_name + ', {source_alias_1}' + c.column_name + ' ) is not null' ),
UpdateNonPKCols = convert( varchar(max), c.column_name + ' = a.' + c.column_name ),
JoinOnNonPKCols = convert( varchar(max), '{source_alias_1}' + c.column_name + ' = {source_alias_2}' + c.column_name ),
ConcatNonPKCols = convert( varchar(max), 'coalesce( convert( varchar, ' + c.column_name + ' ) , '''''''' )' ),
AllNonPKCols = convert( varchar(max), '{source_alias}' + c.column_name )
from #tmpNonPKCols c
where c.ordinal = 1
union all
select
c.table_catalog, c.table_schema, c.table_name, c.column_name, c.ordinal, c.reverse_ordinal,
DiffOnNonPKCols = convert( varchar(max), cnpc.DiffOnNonPKCols + '{wrap} or nullif( {source_alias_1}' + c.column_name + ', {source_alias_2}' + c.column_name + ' ) is not null or nullif( {source_alias_2}' + c.column_name + ', {source_alias_1}' + c.column_name + ' ) is not null' ),
UpdateNonPKCols = convert( varchar(max), cnpc.UpdateNonPKCols + '{wrap}' + c.column_name + ' = a.' + c.column_name ),
JoinOnNonPKCols = convert( varchar(max), cnpc.JoinOnNonPKCols + '{wrap}{source_alias_1}' + c.column_name + ' = {source_alias_2}' + c.column_name ),
ConcatNonPKCols = convert( varchar(max), cnpc.ConcatNonPKCols + '{wrap}+ char(1) + coalesce( convert( varchar, ' + c.column_name + ' ), '''''''' )' ),
AllNonPKCols = convert( varchar(max), cnpc.AllNonPKCols + '{wrap}{source_alias}' + c.column_name )
from #tmpNonPKCols c
join concatnonpkcols cnpc
on c.table_catalog = cnpc.table_catalog
and c.table_schema = cnpc.table_schema
and c.table_name = cnpc.table_name
and c.ordinal = cnpc.ordinal + 1
)
select * into #tmpConcatNonPKCols from concatnonpkcols where reverse_ordinal = 1

----------------------------------------------------------
-- Load temp tables with concattenated All column data
----------------------------------------------------------
if object_id( 'tempdb..#tmpAllCols' ) is not null drop table #tmpAllCols
;with allcols
as
(
select
c.table_catalog, c.table_schema, c.table_name, c.column_name,
ordinal = rank() over ( partition by c.table_name order by c.table_name, c.ordinal_position ),
reverse_ordinal = rank() over ( partition by c.table_name order by c.table_name, c.ordinal_position desc )
from #tmpColumns c (nolock)
join #tmpTables t (nolock)
on c.table_catalog = t.table_catalog
and c.table_schema = t.table_schema
and c.table_name = t.table_name
)
select * into #tmpAllCols from allcols
if object_id( 'tempdb..#tmpConcatAllCols' ) is not null drop table #tmpConcatAllCols
;with concatallcols
as
(
select
c.table_catalog, c.table_schema, c.table_name, c.column_name, c.ordinal, c.reverse_ordinal,
AllCols = convert( varchar(max), '{source_alias}' + c.column_name )
from #tmpAllCols c
where c.ordinal = 1
union all
select
c.table_catalog, c.table_schema, c.table_name, c.column_name, c.ordinal, c.reverse_ordinal,
AllCols = convert( varchar(max), cac.AllCols + '{wrap}{source_alias}' + c.column_name )
from #tmpAllCols c
join concatallcols cac
on c.table_catalog = cac.table_catalog
and c.table_schema = cac.table_schema
and c.table_name = cac.table_name
and c.ordinal = cac.ordinal + 1
)
select * into #tmpConcatAllCols from concatallcols where reverse_ordinal = 1

select * from #tmpTables
select * from #tmpColumns
select * from #tmpTblConstraints
select * from #tmpKeyColUsage
select * from #tmpRefConstraints
select * from #tmpPKCols
select * from #tmpConcatPKCols
select * from #tmpNonPKCols
select * from #tmpConcatNonPKCols
select * from #tmpAllCols
select * from #tmpConcatAllCols