Skip to content

Commit 0dc8fad

Browse files
committed
cas metric
1 parent 0b7f805 commit 0dc8fad

File tree

7 files changed

+77
-32
lines changed

7 files changed

+77
-32
lines changed

cassandra/cassandra-side/cassandra-all-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<configuration>
3939
<artifactSet>
4040
<excludes>
41-
41+
<exclude>org.slf4j</exclude>
4242
</excludes>
4343
</artifactSet>
4444
<filters>

cassandra/cassandra-side/cassandra-async-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
<configuration>
5555
<artifactSet>
5656
<excludes>
57-
57+
<exclude>org.slf4j</exclude>
5858
</excludes>
5959
</artifactSet>
6060
<filters>

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,16 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
174174
return;
175175
}
176176
inputParams.add(equalObj);
177-
stringBuffer.append(sideInfo.getEqualFieldList().get(i))
178-
.append(" = ").append("'" + equalObj + "'")
179-
.append(" and ");
177+
StringBuffer sqlTemp = stringBuffer.append(sideInfo.getEqualFieldList().get(i))
178+
.append(" = ");
179+
if (equalObj instanceof String) {
180+
sqlTemp.append("'" + equalObj + "'")
181+
.append(" and ");
182+
} else {
183+
sqlTemp.append(equalObj)
184+
.append(" and ");
185+
}
186+
180187
}
181188

182189
String key = buildCacheKey(inputParams);
@@ -190,12 +197,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
190197
dealMissKey(input, resultFuture);
191198
return;
192199
} else if (ECacheContentType.MultiLine == val.getType()) {
193-
194-
for (Object rowArray : (List) val.getContent()) {
195-
Row row = fillData(input, rowArray);
196-
resultFuture.complete(Collections.singleton(row));
200+
List<Row> rowList = Lists.newArrayList();
201+
for (Object jsonArray : (List) val.getContent()) {
202+
Row row = fillData(input, jsonArray);
203+
rowList.add(row);
197204
}
198-
205+
resultFuture.complete(rowList);
199206
} else {
200207
throw new RuntimeException("not support cache obj type " + val.getType());
201208
}
@@ -206,7 +213,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
206213
//connect Cassandra
207214
connCassandraDB(cassandraSideTableInfo);
208215

209-
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere;
216+
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
210217
System.out.println("sqlCondition:" + sqlCondition);
211218

212219
ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
@@ -231,14 +238,15 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
231238
cluster.closeAsync();
232239
if (rows.size() > 0) {
233240
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241+
List<Row> rowList = Lists.newArrayList();
234242
for (com.datastax.driver.core.Row line : rows) {
235243
Row row = fillData(input, line);
236244
if (openCache()) {
237245
cacheContent.add(line);
238246
}
239-
resultFuture.complete(Collections.singleton(row));
247+
rowList.add(row);
240248
}
241-
249+
resultFuture.complete(rowList);
242250
if (openCache()) {
243251
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
244252
}
@@ -285,7 +293,6 @@ public Row fillData(Row input, Object line) {
285293
}
286294
}
287295

288-
System.out.println("row:" + row.toString());
289296
return row;
290297
}
291298

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import com.dtstack.flink.sql.table.TableInfo;
2424
import com.dtstack.flink.sql.util.MathUtil;
2525

26+
import java.math.BigDecimal;
27+
import java.sql.Date;
28+
import java.sql.Timestamp;
2629
import java.util.Map;
2730
import java.util.regex.Matcher;
2831
import java.util.regex.Pattern;
@@ -96,4 +99,32 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9699

97100
private static void dealSideSign(Matcher matcher, TableInfo tableInfo) {
98101
}
102+
103+
public Class dbTypeConvertToJavaType(String fieldType) {
104+
switch (fieldType.toLowerCase()) {
105+
case "bigint":
106+
return Long.class;
107+
case "int":
108+
case "counter":
109+
return Integer.class;
110+
111+
case "text":
112+
case "inet":
113+
case "varchar":
114+
case "ascii":
115+
case "timeuuid":
116+
return String.class;
117+
118+
case "decimal":
119+
case "float":
120+
return Float.class;
121+
case "double":
122+
return Double.class;
123+
case "timestamp":
124+
return Timestamp.class;
125+
}
126+
127+
throw new RuntimeException("不支持 " + fieldType + " 类型");
128+
129+
}
99130
}

cassandra/cassandra-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<configuration>
3434
<artifactSet>
3535
<excludes>
36-
36+
<exclude>org.slf4j</exclude>
3737
</excludes>
3838
</artifactSet>
3939
<filters>

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@
4848
import com.datastax.driver.core.SocketOptions;
4949
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5050
import com.datastax.driver.core.policies.RetryPolicy;
51+
import com.dtstack.flink.sql.config.DirtyConfig;
5152
import com.dtstack.flink.sql.metric.MetricConstant;
53+
import com.dtstack.flink.sql.sink.MetricOutputFormat;
5254
import org.apache.flink.api.common.io.RichOutputFormat;
5355
import org.apache.flink.api.common.typeinfo.TypeInformation;
5456
import org.apache.flink.api.java.tuple.Tuple;
@@ -73,7 +75,7 @@
7375
* @see Tuple
7476
* @see DriverManager
7577
*/
76-
public class CassandraOutputFormat extends RichOutputFormat<Tuple2> {
78+
public class CassandraOutputFormat extends MetricOutputFormat {
7779
private static final long serialVersionUID = -7994311331389155692L;
7880

7981
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
@@ -94,17 +96,9 @@ public class CassandraOutputFormat extends RichOutputFormat<Tuple2> {
9496
protected String[] fieldNames;
9597
TypeInformation<?>[] fieldTypes;
9698

97-
private int batchInterval = 5000;
98-
9999
private Cluster cluster;
100100
private Session session = null;
101101

102-
private int batchCount = 0;
103-
104-
private transient Counter outRecords;
105-
106-
private transient Meter outRecordsRate;
107-
108102
public CassandraOutputFormat() {
109103
}
110104

@@ -120,7 +114,9 @@ public void configure(Configuration parameters) {
120114
* I/O problem.
121115
*/
122116
@Override
123-
public void open(int taskNumber, int numTasks) throws IOException {
117+
public void open(int taskNumber, int numTasks) {
118+
initMetric();
119+
initDirtyDataOutputStream();
124120
try {
125121
if (session == null) {
126122
QueryOptions queryOptions = new QueryOptions();
@@ -183,10 +179,6 @@ public void open(int taskNumber, int numTasks) throws IOException {
183179
}
184180
}
185181

186-
private void initMetric() {
187-
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
188-
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
189-
}
190182

191183
/**
192184
* Adds a record to the prepared statement.
@@ -226,7 +218,7 @@ private void insertWrite(Row row) {
226218
resultSet.wasApplied();
227219
}
228220
} catch (Exception e) {
229-
LOG.error("[upsert] is error:" + e.getMessage());
221+
dealInsertError(row.toString(), e);
230222
}
231223
}
232224

@@ -237,7 +229,11 @@ private String buildSql(Row row) {
237229
if (row.getField(index) == null) {
238230
} else {
239231
fields.append(fieldNames[index] + ",");
240-
values.append("'" + row.getField(index) + "'" + ",");
232+
if (row.getField(index) instanceof String) {
233+
values.append("'" + row.getField(index) + "'" + ",");
234+
} else {
235+
values.append(row.getField(index) + ",");
236+
}
241237
}
242238
}
243239
fields.deleteCharAt(fields.length() - 1);
@@ -352,6 +348,12 @@ public CassandraFormatBuilder setPoolTimeoutMillis(Integer poolTimeoutMillis) {
352348
return this;
353349
}
354350

351+
public CassandraFormatBuilder setDirtyConfig(DirtyConfig dirtyConfig) {
352+
format.dirtyConfig = dirtyConfig;
353+
return this;
354+
}
355+
356+
355357
/**
356358
* Finalizes the configuration and checks validity.
357359
*

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package com.dtstack.flink.sql.sink.cassandra;
2121

2222

23+
import com.dtstack.flink.sql.config.DirtyConfig;
2324
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2425
import com.dtstack.flink.sql.sink.cassandra.table.CassandraTableInfo;
2526
import com.dtstack.flink.sql.table.TargetTableInfo;
@@ -57,6 +58,7 @@ public class CassandraSink implements RetractStreamTableSink<Row>, IStreamSinkGe
5758
protected Integer readTimeoutMillis;
5859
protected Integer connectTimeoutMillis;
5960
protected Integer poolTimeoutMillis;
61+
protected DirtyConfig dirtyConfig;
6062

6163
public CassandraSink() {
6264
// TO DO NOTHING
@@ -77,6 +79,7 @@ public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) {
7779
this.readTimeoutMillis = cassandraTableInfo.getReadTimeoutMillis();
7880
this.connectTimeoutMillis = cassandraTableInfo.getConnectTimeoutMillis();
7981
this.poolTimeoutMillis = cassandraTableInfo.getPoolTimeoutMillis();
82+
this.dirtyConfig = cassandraTableInfo.getDirtyConfig();
8083
return this;
8184
}
8285

@@ -96,7 +99,9 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
9699
.setConnectTimeoutMillis(this.connectTimeoutMillis)
97100
.setPoolTimeoutMillis(this.poolTimeoutMillis)
98101
.setFieldNames(this.fieldNames)
99-
.setFieldTypes(this.fieldTypes);
102+
.setFieldTypes(this.fieldTypes)
103+
.setDirtyConfig(dirtyConfig);
104+
100105

101106
CassandraOutputFormat outputFormat = builder.finish();
102107
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);

0 commit comments

Comments
 (0)