Thursday, October 13, 2011

Programmatically Creating an SSIS Package with a Flat File Connection Source

I'm using the SSIS API to programmatically create (and execute) a package. The requirement for the first package is to get .csv data from a flat file into a SQL Server table. So I start checking into the MS docs, looks easy enough. Not so fast. When it comes to a flat file connection, there seems to be holes in the API. I ended up having to use one class that (according to the docs) I shouldn't be referencing in my code, but there doesn't seem to be any other way to make the flat file connection work because it simply won't load column metadata from the file. Won't do it.

I found exactly one other sample of doing this (which was helpful), but that sample's method of reading the flat file's columns seemed less than optimal.

Following is a working sample. Sorry for formatting; I simply don't know any way to prevent the software from stripping out my formatting...

The data file looks like the following. Column names in first row.
"this","that"
"data","more data"
"other data","you get the point"

The SQL table is like yay:
CREATE TABLE [dbo].[bubba](
[this] [varchar](max) NULL,
[that] [varchar](max) NULL
) ON [PRIMARY]

The method below uses this helper class which makes use of the TextFieldParser class which seems very useful. I have no idea why it's in the Microsoft.VisualBasic.FileIO namespace, but whatever works:


class FlatFileColumnReader

{

    public List<string> Columns( string path, char delimiter, FieldType ft )

    {

        var tfp = new TextFieldParser( path )

        {

            TextFieldType = ft

        };

        tfp.Delimiters = new string[] {delimiter.ToString()};

        return tfp.ReadFields().ToList();

    }

}



Here is the (big fat, but working) method. Requires following usings. The referenced assemblies were found on my machine in C:\Program Files (x86)\Microsoft SQL Server\100\SDK\Assemblies. Note that they're in the x86 dir. Thus, my application is set to target x86.
using System.Data.Common;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.VisualBasic.FileIO;


public Microsoft.SqlServer.Dts.Runtime.Package Generate( Microsoft.SqlServer.Dts.Runtime.Application app )

{

    /// Objects

    Microsoft.SqlServer.Dts.Runtime.Package _package;

    Executable _dataFlowTask;

    IDTSComponentMetaData100 _dataSource;

    IDTSComponentMetaData100 _dataDest;

    CManagedComponentWrapper _sourceInstance;

    CManagedComponentWrapper _destinationInstance;

    ConnectionManager _conMgrSource;

    ConnectionManager _conMgrDest;



    /// Create package and data flow task

    _package = new Microsoft.SqlServer.Dts.Runtime.Package();

    _package.DelayValidation = true;

    _dataFlowTask = _package.Executables.Add( "STOCK:PipelineTask" );  // PipelineTask is a DataFlowTask ??

    var pipe = (MainPipe)( (Microsoft.SqlServer.Dts.Runtime.TaskHost)_dataFlowTask ).InnerObject;

    pipe.Events = DtsConvert.GetExtendedInterface( new ComponentEvents() as IDTSComponentEvents );  // my ComponentEvents() just writes some stuff to debug for now



    /// Create connections

    _conMgrSource = _package.Connections.Add( "FLATFILE" );

    _conMgrSource.Properties["Format"].SetValue( _conMgrSource, "Delimited" );

    _conMgrSource.Properties["Name"].SetValue( _conMgrSource, "Flat File Connection" );

    _conMgrSource.Properties["ConnectionString"].SetValue( _conMgrSource, @"C:\temp\Eeemport\bubba.txt" );

    _conMgrSource.Properties["ColumnNamesInFirstDataRow"].SetValue( _conMgrSource, true );

    _conMgrSource.Properties["HeaderRowDelimiter"].SetValue( _conMgrSource, "\r\n" );

    /// If you set the delimiter like this, it'll look correct if you open the resulting package in the UI, but it won't execute (unless you click "Reset Columns")

    //_conMgrSource.Properties["RowDelimiter"].SetValue( _conMgrSource, "{CR}{LF}" );

    _conMgrSource.Properties["TextQualifier"].SetValue( _conMgrSource, "\"" );

    _conMgrSource.Properties["DataRowsToSkip"].SetValue( _conMgrSource, 0 );



    _conMgrDest = _package.Connections.Add( "OLEDB" );

    // This provider wouldn't work

    //_conMgrDest.ConnectionString = @"Provider=Native OLE DB\SQL Server Native Client 10.0;Data Source=.\SQLEXPRESS;Initial Catalog=FASClient;Integrated Security=True";

    _conMgrDest.ConnectionString = @"Provider=SQLOLEDB.1;Integrated Security=SSPI;Persist Security Info=False;Initial Catalog=FASClient;Data Source=.\SQLEXPRESS";

    _conMgrDest.Name = "OLE DB Connection";

    _conMgrDest.Description = "OLE DB Connection";

    _conMgrDest.Properties["RetainSameConnection"].SetValue( _conMgrDest, true );



    /// Create the columns in the flat file connection

    var flatFileConnection = _conMgrSource.InnerObject as IDTSConnectionManagerFlatFile100;

    var fileColumns = new FlatFileColumnReader().Columns( @"C:\temp\Eeemport\bubba.txt", ',', FieldType.Delimited );

    for ( int i = 0; i < fileColumns.Count; i++ )

    {

        /// This object (IDTSConnectionManagerFlatFileColumn100) is not supposed to be referenced by my code according to doc:

        /// http://msdn.microsoft.com/en-us/library/microsoft.sqlserver.dts.runtime.wrapper.idtsconnectionmanagerflatfilecolumn100.aspx

        var column = flatFileConnection.Columns.Add();

        /// Last column delimiter must be newline.

        /// If you select "," for the column delimiter in the designer for a Flat File Connection, and the row delimiter is newline, it does this same thing...

        column.ColumnDelimiter = ( i == fileColumns.Count - 1 ) ? "\r\n" : ",";

        column.TextQualified = true;

        column.ColumnType = "Delimited";

        /// Here's one benefit of creating my own columns:

        /// My destination column in Sql Server is varchar.  The columns seem to be defaulted to DT_WSTR which won't go into a varchar column w/o being

        /// manually changed or run through a data converter component.

        column.DataType = DataType.DT_TEXT;

        column.DataPrecision = 0;

        column.DataScale = 0;

        ( (IDTSName100)column ).Name = fileColumns[i];

    }



    /// Create Data Flow Components

    _dataSource = pipe.ComponentMetaDataCollection.New();

    _dataSource.Name = "Flat File Source";

    _dataSource.ComponentClassID = app.PipelineComponentInfos["Flat File Source"].CreationName;

    _dataSource.ValidateExternalMetadata = false;



    _dataDest = pipe.ComponentMetaDataCollection.New();

    _dataDest.Name = "Sql Server Destination";

    _dataDest.ComponentClassID = app.PipelineComponentInfos["SQL Server Destination"].CreationName;



    ///// Create design instances

    _sourceInstance = _dataSource.Instantiate();

    _sourceInstance.ProvideComponentProperties();



    /// I think this junk must come after ProvideComponentProperties() above

    _dataSource.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface( _conMgrSource );

    _dataSource.RuntimeConnectionCollection[0].ConnectionManagerID = _conMgrSource.ID;



    _sourceInstance.AcquireConnections( null );  // do we need to do this since we created our own columns?

    _sourceInstance.ReinitializeMetaData();

    _sourceInstance.ReleaseConnections();



    _destinationInstance = _dataDest.Instantiate();

    _destinationInstance.ProvideComponentProperties();



    /// I know SetComponentProperty can only be called after ProvideComponentProperties()

    /// To see available component properties, open an existing package (the XML) with an existing component of that type

    _destinationInstance.SetComponentProperty( "BulkInsertTableName", "[dbo].[bubba]" );

    _dataDest.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface( _conMgrDest );

    _dataDest.RuntimeConnectionCollection[0].ConnectionManagerID = _conMgrDest.ID;

    _destinationInstance.AcquireConnections( null );

    _destinationInstance.ReinitializeMetaData();

    _destinationInstance.ReleaseConnections();



    //// Hook the path from source to dest

    var path = pipe.PathCollection.New();

    path.AttachPathAndPropagateNotifications( _dataSource.OutputCollection[0], _dataDest.InputCollection[0] );



    /// Do stuff with the virtual input (whatever the @#$% that is)

    var virtualInput = _dataDest.InputCollection[0].GetVirtualInput();

    foreach ( IDTSVirtualInputColumn100 column in virtualInput.VirtualInputColumnCollection )

    {

        _destinationInstance.SetUsageType( _dataDest.InputCollection[0].ID, virtualInput, column.LineageID, DTSUsageType.UT_READONLY );

    }



    /// MapColumns();

    foreach ( IDTSInputColumn100 inputColumn in _dataDest.InputCollection[0].InputColumnCollection )

    {

        var outputColumn = _dataDest.InputCollection[0].ExternalMetadataColumnCollection[inputColumn.Name];

        outputColumn.Name = inputColumn.Name;

        _destinationInstance.MapInputColumn( _dataDest.InputCollection[0].ID, inputColumn.ID, outputColumn.ID );

    }



    //_package.Validate( _package.Connections, null, null, null );



    return _package;

}

No comments: