4141import java .util .List ;
4242import java .util .Map ;
4343import java .util .Set ;
44+ import java .util .regex .Matcher ;
45+ import java .util .regex .Pattern ;
4446
4547/**
4648 * @author yanxi
4749 */
48- public class RedisOutputFormat extends AbstractDtRichOutputFormat <Tuple2 > {
50+ public class RedisOutputFormat extends AbstractDtRichOutputFormat <Tuple2 < Boolean , Row > > {
4951 private static final Logger LOG = LoggerFactory .getLogger (RedisOutputFormat .class );
5052
53+ private static final Pattern HOST_PORT_PATTERN = Pattern .compile ("(?<host>(.*)):(?<port>\\ d+)*" );
54+
5155 protected String [] fieldNames ;
5256
5357 protected TypeInformation <?>[] fieldTypes ;
@@ -82,8 +86,6 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
8286
8387 private JedisSentinelPool jedisSentinelPool ;
8488
85- private GenericObjectPoolConfig poolConfig ;
86-
8789 private RedisOutputFormat () {
8890 }
8991
@@ -117,17 +119,27 @@ private GenericObjectPoolConfig setPoolConfig(String maxTotal, String maxIdle, S
117119 }
118120
119121 private void establishConnection () {
120- poolConfig = setPoolConfig (maxTotal , maxIdle , minIdle );
122+ GenericObjectPoolConfig poolConfig = setPoolConfig (maxTotal , maxIdle , minIdle );
121123 String [] nodes = StringUtils .split (url , "," );
122124 String [] firstIpPort = StringUtils .split (nodes [0 ], ":" );
123125 String firstIp = firstIpPort [0 ];
124126 String firstPort = firstIpPort [1 ];
125127 Set <HostAndPort > addresses = new HashSet <>();
126128 Set <String > ipPorts = new HashSet <>();
127- for (String ipPort : nodes ) {
128- ipPorts .add (ipPort );
129- String [] ipPortPair = StringUtils .split (ipPort , ":" );
130- addresses .add (new HostAndPort (ipPortPair [0 ].trim (), Integer .parseInt (ipPortPair [1 ].trim ())));
129+
130+ // 对ipv6 支持
131+ for (String node : nodes ) {
132+ ipPorts .add (node );
133+ Matcher matcher = HOST_PORT_PATTERN .matcher (node );
134+ if (matcher .find ()) {
135+ String host = matcher .group ("host" ).trim ();
136+ String portStr = matcher .group ("port" ).trim ();
137+ if (StringUtils .isNotBlank (host ) && StringUtils .isNotBlank (portStr )) {
138+ // 转化为int格式的端口
139+ int port = Integer .parseInt (portStr );
140+ addresses .add (new HostAndPort (host , port ));
141+ }
142+ }
131143 }
132144
133145 switch (RedisType .parse (redisType )) {
@@ -148,13 +160,12 @@ private void establishConnection() {
148160 }
149161
150162 @ Override
151- public void writeRecord (Tuple2 record ) throws IOException {
152- Tuple2 <Boolean , Row > tupleTrans = record ;
153- Boolean retract = tupleTrans .getField (0 );
163+ public void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
164+ Boolean retract = record .getField (0 );
154165 if (!retract ) {
155166 return ;
156167 }
157- Row row = tupleTrans .getField (1 );
168+ Row row = record .getField (1 );
158169 if (row .getArity () != fieldNames .length ) {
159170 return ;
160171 }
0 commit comments