Skip to content

Commit 1813d74

Browse files
committed
Merge branch 'revert-ae75fae6' into 'v1.8.0_dev'
Revert "Merge branch 'v1.8.0_dev_cassandra' into 'v1.8.0_dev'" This reverts merge request !155 See merge request !157
2 parents 6a62b73 + ada82f0 commit 1813d74

File tree

7 files changed

+32
-77
lines changed

7 files changed

+32
-77
lines changed

cassandra/cassandra-side/cassandra-all-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<configuration>
3939
<artifactSet>
4040
<excludes>
41-
<exclude>org.slf4j</exclude>
41+
4242
</excludes>
4343
</artifactSet>
4444
<filters>

cassandra/cassandra-side/cassandra-async-side/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
<configuration>
5555
<artifactSet>
5656
<excludes>
57-
<exclude>org.slf4j</exclude>
57+
5858
</excludes>
5959
</artifactSet>
6060
<filters>

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -174,16 +174,9 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
174174
return;
175175
}
176176
inputParams.add(equalObj);
177-
StringBuffer sqlTemp = stringBuffer.append(sideInfo.getEqualFieldList().get(i))
178-
.append(" = ");
179-
if (equalObj instanceof String) {
180-
sqlTemp.append("'" + equalObj + "'")
181-
.append(" and ");
182-
} else {
183-
sqlTemp.append(equalObj)
184-
.append(" and ");
185-
}
186-
177+
stringBuffer.append(sideInfo.getEqualFieldList().get(i))
178+
.append(" = ").append("'" + equalObj + "'")
179+
.append(" and ");
187180
}
188181

189182
String key = buildCacheKey(inputParams);
@@ -197,12 +190,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
197190
dealMissKey(input, resultFuture);
198191
return;
199192
} else if (ECacheContentType.MultiLine == val.getType()) {
200-
List<Row> rowList = Lists.newArrayList();
201-
for (Object jsonArray : (List) val.getContent()) {
202-
Row row = fillData(input, jsonArray);
203-
rowList.add(row);
193+
194+
for (Object rowArray : (List) val.getContent()) {
195+
Row row = fillData(input, rowArray);
196+
resultFuture.complete(Collections.singleton(row));
204197
}
205-
resultFuture.complete(rowList);
198+
206199
} else {
207200
throw new RuntimeException("not support cache obj type " + val.getType());
208201
}
@@ -213,7 +206,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
213206
//connect Cassandra
214207
connCassandraDB(cassandraSideTableInfo);
215208

216-
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
209+
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere;
217210
System.out.println("sqlCondition:" + sqlCondition);
218211

219212
ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
@@ -238,15 +231,14 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
238231
cluster.closeAsync();
239232
if (rows.size() > 0) {
240233
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241-
List<Row> rowList = Lists.newArrayList();
242234
for (com.datastax.driver.core.Row line : rows) {
243235
Row row = fillData(input, line);
244236
if (openCache()) {
245237
cacheContent.add(line);
246238
}
247-
rowList.add(row);
239+
resultFuture.complete(Collections.singleton(row));
248240
}
249-
resultFuture.complete(rowList);
241+
250242
if (openCache()) {
251243
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
252244
}
@@ -293,6 +285,7 @@ public Row fillData(Row input, Object line) {
293285
}
294286
}
295287

288+
System.out.println("row:" + row.toString());
296289
return row;
297290
}
298291

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
import com.dtstack.flink.sql.table.TableInfo;
2424
import com.dtstack.flink.sql.util.MathUtil;
2525

26-
import java.math.BigDecimal;
27-
import java.sql.Date;
28-
import java.sql.Timestamp;
2926
import java.util.Map;
3027
import java.util.regex.Matcher;
3128
import java.util.regex.Pattern;
@@ -99,32 +96,4 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9996

10097
private static void dealSideSign(Matcher matcher, TableInfo tableInfo) {
10198
}
102-
103-
public Class dbTypeConvertToJavaType(String fieldType) {
104-
switch (fieldType.toLowerCase()) {
105-
case "bigint":
106-
return Long.class;
107-
case "int":
108-
case "counter":
109-
return Integer.class;
110-
111-
case "text":
112-
case "inet":
113-
case "varchar":
114-
case "ascii":
115-
case "timeuuid":
116-
return String.class;
117-
118-
case "decimal":
119-
case "float":
120-
return Float.class;
121-
case "double":
122-
return Double.class;
123-
case "timestamp":
124-
return Timestamp.class;
125-
}
126-
127-
throw new RuntimeException("不支持 " + fieldType + " 类型");
128-
129-
}
13099
}

cassandra/cassandra-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<configuration>
3434
<artifactSet>
3535
<excludes>
36-
<exclude>org.slf4j</exclude>
36+
3737
</excludes>
3838
</artifactSet>
3939
<filters>

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@
4848
import com.datastax.driver.core.SocketOptions;
4949
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5050
import com.datastax.driver.core.policies.RetryPolicy;
51-
import com.dtstack.flink.sql.config.DirtyConfig;
5251
import com.dtstack.flink.sql.metric.MetricConstant;
53-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
5452
import org.apache.flink.api.common.io.RichOutputFormat;
5553
import org.apache.flink.api.common.typeinfo.TypeInformation;
5654
import org.apache.flink.api.java.tuple.Tuple;
@@ -75,7 +73,7 @@
7573
* @see Tuple
7674
* @see DriverManager
7775
*/
78-
public class CassandraOutputFormat extends MetricOutputFormat {
76+
public class CassandraOutputFormat extends RichOutputFormat<Tuple2> {
7977
private static final long serialVersionUID = -7994311331389155692L;
8078

8179
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
@@ -96,9 +94,17 @@ public class CassandraOutputFormat extends MetricOutputFormat {
9694
protected String[] fieldNames;
9795
TypeInformation<?>[] fieldTypes;
9896

97+
private int batchInterval = 5000;
98+
9999
private Cluster cluster;
100100
private Session session = null;
101101

102+
private int batchCount = 0;
103+
104+
private transient Counter outRecords;
105+
106+
private transient Meter outRecordsRate;
107+
102108
public CassandraOutputFormat() {
103109
}
104110

@@ -114,9 +120,7 @@ public void configure(Configuration parameters) {
114120
* I/O problem.
115121
*/
116122
@Override
117-
public void open(int taskNumber, int numTasks) {
118-
initMetric();
119-
initDirtyDataOutputStream();
123+
public void open(int taskNumber, int numTasks) throws IOException {
120124
try {
121125
if (session == null) {
122126
QueryOptions queryOptions = new QueryOptions();
@@ -179,6 +183,10 @@ public void open(int taskNumber, int numTasks) {
179183
}
180184
}
181185

186+
private void initMetric() {
187+
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
188+
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
189+
}
182190

183191
/**
184192
* Adds a record to the prepared statement.
@@ -218,7 +226,7 @@ private void insertWrite(Row row) {
218226
resultSet.wasApplied();
219227
}
220228
} catch (Exception e) {
221-
dealInsertError(row.toString(), e);
229+
LOG.error("[upsert] is error:" + e.getMessage());
222230
}
223231
}
224232

@@ -229,11 +237,7 @@ private String buildSql(Row row) {
229237
if (row.getField(index) == null) {
230238
} else {
231239
fields.append(fieldNames[index] + ",");
232-
if (row.getField(index) instanceof String) {
233-
values.append("'" + row.getField(index) + "'" + ",");
234-
} else {
235-
values.append(row.getField(index) + ",");
236-
}
240+
values.append("'" + row.getField(index) + "'" + ",");
237241
}
238242
}
239243
fields.deleteCharAt(fields.length() - 1);
@@ -348,12 +352,6 @@ public CassandraFormatBuilder setPoolTimeoutMillis(Integer poolTimeoutMillis) {
348352
return this;
349353
}
350354

351-
public CassandraFormatBuilder setDirtyConfig(DirtyConfig dirtyConfig) {
352-
format.dirtyConfig = dirtyConfig;
353-
return this;
354-
}
355-
356-
357355
/**
358356
* Finalizes the configuration and checks validity.
359357
*

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package com.dtstack.flink.sql.sink.cassandra;
2121

2222

23-
import com.dtstack.flink.sql.config.DirtyConfig;
2423
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2524
import com.dtstack.flink.sql.sink.cassandra.table.CassandraTableInfo;
2625
import com.dtstack.flink.sql.table.TargetTableInfo;
@@ -58,7 +57,6 @@ public class CassandraSink implements RetractStreamTableSink<Row>, IStreamSinkGe
5857
protected Integer readTimeoutMillis;
5958
protected Integer connectTimeoutMillis;
6059
protected Integer poolTimeoutMillis;
61-
protected DirtyConfig dirtyConfig;
6260

6361
public CassandraSink() {
6462
// TO DO NOTHING
@@ -79,7 +77,6 @@ public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) {
7977
this.readTimeoutMillis = cassandraTableInfo.getReadTimeoutMillis();
8078
this.connectTimeoutMillis = cassandraTableInfo.getConnectTimeoutMillis();
8179
this.poolTimeoutMillis = cassandraTableInfo.getPoolTimeoutMillis();
82-
this.dirtyConfig = cassandraTableInfo.getDirtyConfig();
8380
return this;
8481
}
8582

@@ -99,9 +96,7 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
9996
.setConnectTimeoutMillis(this.connectTimeoutMillis)
10097
.setPoolTimeoutMillis(this.poolTimeoutMillis)
10198
.setFieldNames(this.fieldNames)
102-
.setFieldTypes(this.fieldTypes)
103-
.setDirtyConfig(dirtyConfig);
104-
99+
.setFieldTypes(this.fieldTypes);
105100

106101
CassandraOutputFormat outputFormat = builder.finish();
107102
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);

0 commit comments

Comments
 (0)