Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions be/src/exec/file_scanner/csv_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,10 @@ void CSVScanner::_report_rejected_record(const CSVReader::Record& record, const
_state->append_rejected_record_to_file(record.to_string(), err_msg, _curr_reader->filename());
}

static TypeDescriptor get_type_desc(const Slice& field) {
static TypeDescriptor get_type_desc(const Slice& field, const bool& sampleTypes) {
if (!sampleTypes) {
return TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH);
}
StringParser::ParseResult result;

StringParser::string_to_int<int64_t>(field.get_data(), field.get_size(), &result);
Expand Down Expand Up @@ -642,7 +645,8 @@ Status CSVScanner::_get_schema(std::vector<SlotDescriptor>* merged_schema) {
_curr_reader->split_record(record, &fields);
for (size_t i = 0; i < fields.size(); i++) {
// column name: $1, $2, $3...
schema.emplace_back(i, fmt::format("${}", i + 1), get_type_desc(fields[i]));
schema.emplace_back(i, fmt::format("${}", i + 1),
get_type_desc(fields[i], _scan_range.params.schema_sample_types));
}
schemas.emplace_back(schema);
i++;
Expand Down Expand Up @@ -679,7 +683,8 @@ Status CSVScanner::_get_schema_v2(std::vector<SlotDescriptor>* merged_schema) {
const Slice field(basePtr + column.start_pos, column.length);

// column name: $1, $2, $3...
schema.emplace_back(i, fmt::format("${}", i + 1), get_type_desc(field));
schema.emplace_back(i, fmt::format("${}", i + 1),
get_type_desc(field, _scan_range.params.schema_sample_types));
}
schemas.emplace_back(schema);
i++;
Expand Down
59 changes: 59 additions & 0 deletions be/test/exec/file_scanner/csv_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,65 @@ TEST_P(CSVScannerTest, test_get_schema) {
EXPECT_EQ(expected_schema[i].second, schema[i].type().type) << schema[i].col_name();
}
}

{
// sample 1 row, enclose ", escape "\"
std::vector<std::pair<std::string, LogicalType>> expected_schema = {
{"$1", TYPE_BIGINT}, {"$2", TYPE_VARCHAR}, {"$3", TYPE_DOUBLE}, {"$4", TYPE_BOOLEAN}};

std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
range.__set_path("./be/test/exec/test_data/csv_scanner/type_sniff.csv");
range.__set_num_of_columns_from_file(0);
ranges.push_back(range);

TBrokerScanRangeParams* params = _obj_pool.add(new TBrokerScanRangeParams());
params->__set_row_delimiter('\n');
params->__set_column_separator(',');
params->__set_enclose('"');
params->__set_escape('\\');
params->__set_schema_sample_file_row_count(1);
auto scanner = create_csv_scanner({}, ranges, params);
EXPECT_OK(scanner->open());
std::vector<SlotDescriptor> schema;
EXPECT_OK(scanner->get_schema(&schema));
EXPECT_EQ(expected_schema.size(), schema.size());

for (size_t i = 0; i < schema.size(); i++) {
EXPECT_EQ(expected_schema[i].first, schema[i].col_name());
EXPECT_EQ(expected_schema[i].second, schema[i].type().type) << schema[i].col_name();
}
}

{
// sample 1 row, enclose ", escape "\", no type detection
std::vector<std::pair<std::string, LogicalType>> expected_schema = {
{"$1", TYPE_VARCHAR}, {"$2", TYPE_VARCHAR}, {"$3", TYPE_VARCHAR}, {"$4", TYPE_VARCHAR}};

std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
range.__set_path("./be/test/exec/test_data/csv_scanner/type_sniff.csv");
range.__set_num_of_columns_from_file(0);
ranges.push_back(range);

TBrokerScanRangeParams* params = _obj_pool.add(new TBrokerScanRangeParams());
params->__set_row_delimiter('\n');
params->__set_column_separator(',');
params->__set_enclose('"');
params->__set_escape('\\');
params->__set_schema_sample_file_row_count(1);
params->__set_schema_sample_types(false);
auto scanner = create_csv_scanner({}, ranges, params);
EXPECT_OK(scanner->open());
std::vector<SlotDescriptor> schema;
EXPECT_OK(scanner->get_schema(&schema));
EXPECT_EQ(expected_schema.size(), schema.size());

for (size_t i = 0; i < schema.size(); i++) {
EXPECT_EQ(expected_schema[i].first, schema[i].col_name());
EXPECT_EQ(expected_schema[i].second, schema[i].type().type) << schema[i].col_name();
}
}
}

TEST_P(CSVScannerTest, test_flexible_column_mapping) {
Expand Down
1 change: 1 addition & 0 deletions be/test/exec/test_data/csv_scanner/type_sniff.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
001,Jok\,e,"21.3",false
2 changes: 2 additions & 0 deletions docs/en/sql-reference/sql-functions/table-functions/files.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ You can configure the sampling rule using the following parameters:

- `auto_detect_sample_files`: the number of random data files to sample in each batch. By default, the first and last files are selected. Range: `[0, + ∞]`. Default: `2`.
- `auto_detect_sample_rows`: the number of data rows to scan in each sampled data file. Range: `[0, + ∞]`. Default: `500`.
- `auto_detect_types`: (valid for CSV only) - whether to guess the data types of sampled columns, or just assume String. `{true | false}`. Default: `true`.

After the sampling, StarRocks unionizes the columns from all the data files according to these rules:

Expand All @@ -227,6 +228,7 @@ After the sampling, StarRocks unionizes the columns from all the data files acco
- Integer columns together with `FLOAT` type columns will be unionized as the DECIMAL type.
- String types are used for unionizing other types.
- Generally, the `STRING` type can be used to unionize all data types.
- If type auto-detection is turned off, all columns will return as `STRING`

You can refer to Example 5.

Expand Down
2 changes: 2 additions & 0 deletions docs/ja/sql-reference/sql-functions/table-functions/files.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ v3.2 以降、`FILES()` は同じバッチのデータファイルの自動ス

- `auto_detect_sample_files`: 各バッチでサンプリングするランダムなデータファイルの数。デフォルトでは、最初と最後のファイルが選択されます。範囲: `[0, + ∞]`。デフォルト: `2`。
- `auto_detect_sample_rows`: 各サンプリングされたデータファイルでスキャンするデータ行の数。範囲: `[0, + ∞]`。デフォルト: `500`。
- `auto_detect_types`: (CSV のみ有効) - サンプリングされた列のデータ型を推測するか、文字列であると想定するかを指定します。`{true | false}`。デフォルト: `true`。

サンプリング後、StarRocks は次のルールに従ってすべてのデータファイルから列を統合します。

Expand All @@ -227,6 +228,7 @@ v3.2 以降、`FILES()` は同じバッチのデータファイルの自動ス
- 整数列と `FLOAT` 型列は DECIMAL 型として統合されます。
- 文字列型は他の型を統合するために使用されます。
- 一般的に、`STRING` 型はすべてのデータ型を統合するために使用できます。
- 型の自動検出がオフになっている場合、すべての列は `STRING` として返されます

例 5 を参照してください。

Expand Down
2 changes: 2 additions & 0 deletions docs/zh/sql-reference/sql-functions/table-functions/files.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ CSV 格式示例:

- `auto_detect_sample_files`:每批中要采样的随机数据文件数量。默认情况下,选择第一个和最后一个文件。范围:`[0, + ∞]`。默认值:`2`。
- `auto_detect_sample_rows`:每个采样数据文件中要扫描的数据行数。范围:`[0, + ∞]`。默认值:`500`。
- `auto_detect_types`:(仅适用于 CSV 文件)- 是否猜测采样列的数据类型,或者直接假定为字符串。`{true | false}`。默认值:`true`。

采样后,StarRocks 根据以下规则联合化所有数据文件的列:

Expand All @@ -227,6 +228,7 @@ CSV 格式示例:
- 整数列与 `FLOAT` 类型列一起将被联合化为 DECIMAL 类型。
- 字符串类型用于联合化其他类型。
- 通常,`STRING` 类型可以用于联合化所有数据类型。
- 如果类型自动检测已关闭,则所有列都将返回 `STRING` 类型。

您可以参考示例 5。

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@

public static final String PROPERTY_AUTO_DETECT_SAMPLE_FILES = "auto_detect_sample_files";
public static final String PROPERTY_AUTO_DETECT_SAMPLE_ROWS = "auto_detect_sample_rows";
public static final String PROPERTY_AUTO_DETECT_TYPES = "auto_detect_types";

private static final String PROPERTY_FILL_MISMATCH_COLUMN_WITH = "fill_mismatch_column_with";

Expand Down Expand Up @@ -177,6 +178,7 @@
// for load/query data
private int autoDetectSampleFiles = DEFAULT_AUTO_DETECT_SAMPLE_FILES;
private int autoDetectSampleRows = DEFAULT_AUTO_DETECT_SAMPLE_ROWS;
private boolean autoDetectTypes = true;

private List<String> columnsFromPath = new ArrayList<>();
private boolean strictMode = false;
Expand Down Expand Up @@ -504,6 +506,21 @@
}
}

if (properties.containsKey(PROPERTY_AUTO_DETECT_TYPES)) {
String property = properties.get(PROPERTY_AUTO_DETECT_TYPES);
if (property.equalsIgnoreCase("true")) {
autoDetectTypes = true;
} else if (property.equalsIgnoreCase("false")) {
autoDetectTypes = false;
} else {
throw new DdlException(
String.format(
"Illegal value of %s: %s, only true/false allowed", PROPERTY_AUTO_DETECT_TYPES, property
)
);
}
}

if (properties.containsKey(PROPERTY_CSV_COLUMN_SEPARATOR)) {
csvColumnSeparator = Delimiter.convertDelimiter(properties.get(PROPERTY_CSV_COLUMN_SEPARATOR));
int len = csvColumnSeparator.getBytes(StandardCharsets.UTF_8).length;
Expand Down Expand Up @@ -604,6 +621,7 @@
params.setProperties(properties);
params.setSchema_sample_file_count(autoDetectSampleFiles);
params.setSchema_sample_file_row_count(autoDetectSampleRows);
params.setSchema_sample_types(autoDetectTypes);
params.setEnclose(csvEnclose);
params.setEscape(csvEscape);
params.setSkip_header(csvSkipHeader);
Expand Down Expand Up @@ -783,7 +801,7 @@
return csvTrimSpace;
}

public void parsePropertiesForUnload(List<Column> columns, SessionVariable sessionVariable) {

Check warning on line 804 in fe/fe-core/src/main/java/com/starrocks/catalog/TableFunctionTable.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 108 to 64, Complexity from 31 to 14, Nesting Level from 3 to 2, Number of Variables from 15 to 6.

See more on https://sonarcloud.io/project/issues?id=StarRocks_starrocks&issues=AZsH5RlW1T9zVNUJakuM&open=AZsH5RlW1T9zVNUJakuM&pullRequest=66519
List<String> columnNames = columns.stream()
.map(Column::getName)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -362,4 +363,49 @@
"Invalid parquet.version: '2.0'. Expected values should be 2.4, 2.6, 1.0",
() -> new TableFunctionTable(new ArrayList<>(), properties, new SessionVariable()));
}

@Test
public void testAutoDetectTypes() throws NoSuchFieldException {

Check warning on line 368 in fe/fe-core/src/test/java/com/starrocks/catalog/TableFunctionTableTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=StarRocks_starrocks&issues=AZsJ6-vyHXekTjWS8nSp&open=AZsJ6-vyHXekTjWS8nSp&pullRequest=66519
Map<String, String> properties = new HashMap<>();
properties.put(TableFunctionTable.PROPERTY_PATH, "fake://test/path");
properties.put(TableFunctionTable.PROPERTY_FORMAT, "csv");

Field field = TableFunctionTable.class.getDeclaredField("autoDetectTypes");
field.setAccessible(true);

{
// Case 1: default
Assertions.assertDoesNotThrow(() -> {
TableFunctionTable table = new TableFunctionTable(properties);
Assertions.assertTrue((Boolean) field.get(table));
});
}

{
// Case 2: auto_detect_types = false
properties.put(TableFunctionTable.PROPERTY_AUTO_DETECT_TYPES, "false");
Assertions.assertDoesNotThrow(() -> {
TableFunctionTable table = new TableFunctionTable(properties);
Assertions.assertFalse((Boolean) field.get(table));
});
}

{
// Case 3: auto_detect_types = true
properties.put(TableFunctionTable.PROPERTY_AUTO_DETECT_TYPES, "true");
Assertions.assertDoesNotThrow(() -> {
TableFunctionTable table = new TableFunctionTable(properties);
Assertions.assertTrue((Boolean) field.get(table));
});
}

{
// abnormal
properties.put(TableFunctionTable.PROPERTY_AUTO_DETECT_TYPES, "notaboolean");
ExceptionChecker.expectThrowsWithMsg(DdlException.class,
"Illegal value of auto_detect_types: notaboolean, only true/false allowed",
() -> new TableFunctionTable(properties)
);
}
}
}
1 change: 1 addition & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ struct TBrokerScanRangeParams {
31: optional i64 schema_sample_file_row_count
32: optional bool flexible_column_mapping
33: optional TFileScanType file_scan_type
34: optional bool schema_sample_types = true
}

// Broker scan range
Expand Down
Loading