4
4
5
5
using System ;
6
6
using System . Data ;
7
- using System . Text ;
7
+ using System . Threading . Tasks ;
8
8
using Xunit ;
9
9
10
10
namespace Microsoft . Data . SqlClient . ManualTesting . Tests
11
11
{
12
- public class TestBulkCopyWithUtf8
12
+ public sealed class TestBulkCopyWithUtf8 : IDisposable
13
13
{
14
- private readonly string _connectionString ;
14
+ private static string sourceTable = DataTestUtility . GetUniqueName ( "SrcUtf8DataTable" ) ;
15
+ private static string destinationTable = DataTestUtility . GetUniqueName ( "DstUtf8DataTable" ) ;
16
+ private static readonly string insertQuery = $ "INSERT INTO{ sourceTable } VALUES('test')";
15
17
16
18
public TestBulkCopyWithUtf8 ( )
17
19
{
18
- _connectionString = new SqlConnectionStringBuilder ( DataTestUtility . TCPConnectionString ) { MultipleActiveResultSets = true } . ConnectionString ;
20
+ using SqlConnection sourceConnection = new SqlConnection ( GetConnectionString ( true ) ) ;
21
+ sourceConnection . Open ( ) ;
22
+ SetupTables ( sourceConnection , sourceTable , destinationTable , insertQuery ) ;
23
+ }
24
+
25
+ private string GetConnectionString ( bool enableMars )
26
+ {
27
+ return new SqlConnectionStringBuilder ( DataTestUtility . TCPConnectionString )
28
+ {
29
+ MultipleActiveResultSets = enableMars
30
+ } . ConnectionString ;
19
31
}
20
32
21
33
private void SetupTables ( SqlConnection connection , string sourceTable , string destinationTable , string insertQuery )
22
34
{
23
35
string columnDefinition = "(str_col varchar(max) COLLATE Latin1_General_100_CS_AS_KS_WS_SC_UTF8)" ;
24
36
DataTestUtility . CreateTable ( connection , sourceTable , columnDefinition ) ;
25
37
DataTestUtility . CreateTable ( connection , destinationTable , columnDefinition ) ;
26
-
27
38
using SqlCommand insertCommand = connection . CreateCommand ( ) ;
28
39
insertCommand . CommandText = insertQuery ;
29
40
Helpers . TryExecute ( insertCommand , insertQuery ) ;
30
41
}
31
42
32
- [ ConditionalFact ( typeof ( DataTestUtility ) , nameof ( DataTestUtility . AreConnStringsSetup ) , nameof ( DataTestUtility . IsNotAzureServer ) , nameof ( DataTestUtility . IsNotAzureSynapse ) ) ]
33
- public void BulkCopy_Utf8Data_ShouldMatchSource ( )
43
+ [ ConditionalTheory ( typeof ( DataTestUtility ) ,
44
+ nameof ( DataTestUtility . AreConnStringsSetup ) ,
45
+ nameof ( DataTestUtility . IsNotAzureServer ) ,
46
+ nameof ( DataTestUtility . IsNotAzureSynapse ) ) ]
47
+ [ InlineData ( true , true ) ]
48
+ [ InlineData ( false , true ) ]
49
+ [ InlineData ( true , false ) ]
50
+ [ InlineData ( false , false ) ]
51
+ public void BulkCopy_Utf8Data_ShouldMatchSource ( bool isMarsEnabled , bool enableStreaming )
34
52
{
35
- string sourceTable = DataTestUtility . GetUniqueName ( "SrcUtf8DataTable" ) ;
36
- string destinationTable = DataTestUtility . GetUniqueName ( "DstUtf8DataTable" ) ;
37
- string insertQuery = $ "INSERT INTO{ sourceTable } VALUES('test')";
38
-
39
- using SqlConnection sourceConnection = new SqlConnection ( _connectionString ) ;
53
+ using SqlConnection sourceConnection = new SqlConnection ( GetConnectionString ( isMarsEnabled ) ) ;
40
54
sourceConnection . Open ( ) ;
41
- SetupTables ( sourceConnection , sourceTable , destinationTable , insertQuery ) ;
42
-
43
55
using SqlCommand countCommand = new SqlCommand ( $ "SELECT COUNT(*) FROM{ destinationTable } ", sourceConnection ) ;
44
56
long initialCount = Convert . ToInt64 ( countCommand . ExecuteScalar ( ) ) ;
45
-
46
57
using SqlCommand sourceDataCommand = new SqlCommand ( $ "SELECT str_col FROM{ sourceTable } ", sourceConnection ) ;
47
58
using SqlDataReader reader = sourceDataCommand . ExecuteReader ( CommandBehavior . SequentialAccess ) ;
48
-
49
- using SqlConnection destinationConnection = new SqlConnection ( _connectionString ) ;
59
+ using SqlConnection destinationConnection = new SqlConnection ( GetConnectionString ( isMarsEnabled ) ) ;
50
60
destinationConnection . Open ( ) ;
51
-
61
+
52
62
using SqlBulkCopy bulkCopy = new SqlBulkCopy ( destinationConnection )
53
63
{
54
- EnableStreaming = true ,
64
+ EnableStreaming = enableStreaming ,
55
65
DestinationTableName = destinationTable
56
66
} ;
57
67
@@ -63,20 +73,75 @@ public void BulkCopy_Utf8Data_ShouldMatchSource()
63
73
{
64
74
Assert . Fail ( $ "Bulk copy failed:{ ex . Message } ") ;
65
75
}
66
-
76
+
77
+ reader . Close ( ) ;
67
78
long finalCount = Convert . ToInt64 ( countCommand . ExecuteScalar ( ) ) ;
68
79
Assert . Equal ( 1 , finalCount - initialCount ) ;
69
-
70
80
using SqlCommand verifyCommand = new SqlCommand ( $ "SELECT cast(str_col as varbinary) FROM{ destinationTable } ", destinationConnection ) ;
71
81
using SqlDataReader verifyReader = verifyCommand . ExecuteReader ( CommandBehavior . SequentialAccess ) ;
72
-
73
82
byte [ ] expectedBytes = new byte [ ] { 0x74 , 0x65 , 0x73 , 0x74 } ;
74
-
75
83
Assert . True ( verifyReader . Read ( ) , "No data found in destination table after bulk copy." ) ;
84
+ byte [ ] actualBytes = verifyReader . GetSqlBinary ( 0 ) . Value ;
85
+ Assert . Equal ( expectedBytes . Length , actualBytes . Length ) ;
86
+ Assert . Equal ( expectedBytes , actualBytes ) ;
87
+ }
88
+
89
+
90
+ [ ConditionalTheory ( typeof ( DataTestUtility ) ,
91
+ nameof ( DataTestUtility . AreConnStringsSetup ) ,
92
+ nameof ( DataTestUtility . IsNotAzureServer ) ,
93
+ nameof ( DataTestUtility . IsNotAzureSynapse ) ) ]
94
+ [ InlineData ( true , true ) ]
95
+ [ InlineData ( false , true ) ]
96
+ [ InlineData ( true , false ) ]
97
+ [ InlineData ( false , false ) ]
98
+ public async Task BulkCopy_Utf8Data_ShouldMatchSource_Async ( bool isMarsEnabled , bool enableStreaming )
99
+ {
100
+ string connectionString = GetConnectionString ( isMarsEnabled ) ;
101
+ using SqlConnection sourceConnection = new SqlConnection ( connectionString ) ;
102
+ await sourceConnection . OpenAsync ( ) ;
103
+ using SqlCommand countCommand = new SqlCommand ( $ "SELECT COUNT(*) FROM{ destinationTable } ", sourceConnection ) ;
104
+ long initialCount = Convert . ToInt64 ( await countCommand . ExecuteScalarAsync ( ) ) ;
105
+ using SqlCommand sourceDataCommand = new SqlCommand ( $ "SELECT str_col FROM{ sourceTable } ", sourceConnection ) ;
106
+ using SqlDataReader reader = await sourceDataCommand . ExecuteReaderAsync ( CommandBehavior . SequentialAccess ) ;
107
+ using SqlConnection destinationConnection = new SqlConnection ( connectionString ) ;
108
+ await destinationConnection . OpenAsync ( ) ;
109
+
110
+ using SqlBulkCopy bulkCopy = new SqlBulkCopy ( destinationConnection )
111
+ {
112
+ EnableStreaming = enableStreaming ,
113
+ DestinationTableName = destinationTable
114
+ } ;
76
115
116
+ try
117
+ {
118
+ await bulkCopy . WriteToServerAsync ( reader ) ;
119
+ }
120
+ catch ( Exception ex )
121
+ {
122
+ Assert . Fail ( $ "Bulk copy failed:{ ex . Message } ") ;
123
+ }
124
+
125
+ reader . Close ( ) ;
126
+ long finalCount = Convert . ToInt64 ( await countCommand . ExecuteScalarAsync ( ) ) ;
127
+ Assert . Equal ( 1 , finalCount - initialCount ) ;
128
+ using SqlCommand verifyCommand = new SqlCommand ( $ "SELECT cast(str_col as varbinary) FROM{ destinationTable } ", destinationConnection ) ;
129
+ using SqlDataReader verifyReader = await verifyCommand . ExecuteReaderAsync ( CommandBehavior . SequentialAccess ) ;
130
+ byte [ ] expectedBytes = new byte [ ] { 0x74 , 0x65 , 0x73 , 0x74 } ;
131
+ Assert . True ( await verifyReader . ReadAsync ( ) , "No data found in destination table after bulk copy." ) ;
77
132
byte [ ] actualBytes = verifyReader . GetSqlBinary ( 0 ) . Value ;
78
133
Assert . Equal ( expectedBytes . Length , actualBytes . Length ) ;
79
134
Assert . Equal ( expectedBytes , actualBytes ) ;
80
135
}
136
+
137
+
138
+ public void Dispose ( )
139
+ {
140
+ using SqlConnection connection = new SqlConnection ( GetConnectionString ( true ) ) ;
141
+ connection . Open ( ) ;
142
+ DataTestUtility . DropTable ( connection , sourceTable ) ;
143
+ DataTestUtility . DropTable ( connection , destinationTable ) ;
144
+ connection . Close ( ) ;
145
+ }
81
146
}
82
147
}