Skip to content

Commit f4b1b65

Browse files
feat(filesystems): add new SMB filesystem
1 parent fcfaa19 commit f4b1b65

File tree

13 files changed

+1459
-0
lines changed

13 files changed

+1459
-0
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
SPDX-License-Identifier: Apache-2.0
4+
Copyright (c) StreamThoughts
5+
6+
Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
7+
-->
8+
<project xmlns="http://maven.apache.org/POM/4.0.0"
9+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
10+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
11+
<parent>
12+
<groupId>io.streamthoughts</groupId>
13+
<artifactId>kafka-connect-filepulse-filesystems</artifactId>
14+
<version>2.17.0-SNAPSHOT</version>
15+
</parent>
16+
<modelVersion>4.0.0</modelVersion>
17+
18+
<name>Kafka Connect Source File Pulse SMB FS</name>
19+
<artifactId>kafka-connect-filepulse-smb-fs</artifactId>
20+
21+
<properties>
22+
<checkstyle.config.location>${project.parent.basedir}/..</checkstyle.config.location>
23+
<license.header.file>${project.parent.basedir}/../header</license.header.file>
24+
<jcifs.version>2.1.40</jcifs.version>
25+
<mockito-junit-jupiter.version>5.5.0</mockito-junit-jupiter.version>
26+
<assertj-core.version>3.26.3</assertj-core.version>
27+
<lombok.version>1.18.34</lombok.version>
28+
</properties>
29+
30+
<dependencies>
31+
<dependency>
32+
<groupId>org.codelibs</groupId>
33+
<artifactId>jcifs</artifactId>
34+
<version>${jcifs.version}</version>
35+
</dependency>
36+
<dependency>
37+
<groupId>io.streamthoughts</groupId>
38+
<artifactId>kafka-connect-filepulse-commons-fs</artifactId>
39+
<version>${project.version}</version>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.commons</groupId>
43+
<artifactId>commons-compress</artifactId>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.apache.avro</groupId>
47+
<artifactId>avro</artifactId>
48+
</dependency>
49+
<!-- Test Dependencies-->
50+
<dependency>
51+
<groupId>org.mockito</groupId>
52+
<artifactId>mockito-junit-jupiter</artifactId>
53+
<version>${mockito-junit-jupiter.version}</version>
54+
<scope>test</scope>
55+
</dependency>
56+
<dependency>
57+
<groupId>org.assertj</groupId>
58+
<artifactId>assertj-core</artifactId>
59+
<version>${assertj-core.version}</version>
60+
<scope>test</scope>
61+
</dependency>
62+
<dependency>
63+
<groupId>org.projectlombok</groupId>
64+
<artifactId>lombok</artifactId>
65+
<version>${lombok.version}</version>
66+
<scope>test</scope>
67+
</dependency>
68+
</dependencies>
69+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) StreamThoughts
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.kafka.connect.filepulse.fs;
8+
9+
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
10+
import io.streamthoughts.kafka.connect.filepulse.fs.client.SmbClient;
11+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
12+
import java.io.InputStream;
13+
import java.net.URI;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
/**
18+
* Implementation of {@link Storage} for SMB/CIFS file systems.
19+
*/
20+
public class SmbFileStorage implements Storage {
21+
22+
private static final Logger LOG = LoggerFactory.getLogger(SmbFileStorage.class);
23+
24+
private final SmbClient smbClient;
25+
26+
public SmbFileStorage(SmbFileSystemListingConfig config) {
27+
this.smbClient = new SmbClient(config);
28+
}
29+
30+
SmbFileStorage(SmbClient smbClient) {
31+
this.smbClient = smbClient;
32+
}
33+
34+
/**
35+
* {@inheritDoc}
36+
*/
37+
@Override
38+
public FileObjectMeta getObjectMetadata(URI uri) {
39+
LOG.debug("Getting object metadata for '{}'", uri);
40+
try {
41+
return smbClient.getObjectMetadata(uri);
42+
} catch (Exception e) {
43+
throw new ConnectFilePulseException(String.format("Cannot stat file with uri: %s", uri), e);
44+
}
45+
}
46+
47+
/**
48+
* {@inheritDoc}
49+
*/
50+
@Override
51+
public boolean exists(URI uri) {
52+
LOG.debug("Checking if '{}' exists", uri);
53+
try {
54+
return smbClient.exists(uri);
55+
} catch (Exception e) {
56+
throw new ConnectFilePulseException(
57+
String.format("Failed to check if SMB file exists: %s", uri), e);
58+
}
59+
}
60+
61+
/**
62+
* {@inheritDoc}
63+
*/
64+
@Override
65+
public boolean delete(URI uri) {
66+
LOG.info("Deleting '{}'", uri);
67+
try {
68+
return smbClient.delete(uri);
69+
} catch (Exception e) {
70+
LOG.error("Failed to delete SMB file: {}", uri, e);
71+
throw new ConnectFilePulseException("Failed to delete SMB file: " + uri, e);
72+
}
73+
}
74+
75+
/**
76+
* {@inheritDoc}
77+
*/
78+
@Override
79+
public boolean move(URI source, URI dest) {
80+
LOG.info("Moving '{}' to '{}'", source, dest);
81+
try {
82+
return smbClient.move(source, dest);
83+
} catch (Exception e) {
84+
LOG.error("Failed to move SMB file from {} to {}", source, dest, e);
85+
throw new ConnectFilePulseException(
86+
String.format("Failed to move SMB file from %s to %s", source, dest), e);
87+
}
88+
}
89+
90+
/**
91+
* {@inheritDoc}
92+
*/
93+
@Override
94+
public InputStream getInputStream(URI uri) {
95+
LOG.debug("Getting input stream for '{}'", uri);
96+
try {
97+
return smbClient.getInputStream(uri);
98+
} catch (Exception e) {
99+
LOG.error("Failed to get input stream for SMB file: {}", uri, e);
100+
throw new ConnectFilePulseException("Failed to get input stream for SMB file: " + uri, e);
101+
}
102+
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) StreamThoughts
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.kafka.connect.filepulse.fs;
8+
9+
import io.streamthoughts.kafka.connect.filepulse.fs.client.SmbClient;
10+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
11+
import java.util.Collection;
12+
import java.util.Collections;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.Objects;
16+
import java.util.stream.Collectors;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
/**
21+
* Implementation of {@link FileSystemListing} for SMB/CIFS file systems.
22+
*/
23+
public class SmbFileSystemListing implements FileSystemListing<SmbFileStorage> {
24+
25+
private static final Logger LOG = LoggerFactory.getLogger(SmbFileSystemListing.class);
26+
27+
private FileListFilter filter;
28+
private SmbFileSystemListingConfig config;
29+
private SmbClient smbClient;
30+
31+
public SmbFileSystemListing(final List<FileListFilter> filters) {
32+
Objects.requireNonNull(filters, "filters can't be null");
33+
this.filter = new CompositeFileListFilter(filters);
34+
}
35+
36+
@SuppressWarnings("unused")
37+
public SmbFileSystemListing() {
38+
this(Collections.emptyList());
39+
}
40+
41+
/**
42+
* {@inheritDoc}
43+
*/
44+
@Override
45+
public void configure(final Map<String, ?> configs) {
46+
LOG.debug("Configuring SmbFilesystemListing");
47+
config = new SmbFileSystemListingConfig(configs);
48+
smbClient = new SmbClient(config);
49+
}
50+
51+
/**
52+
* {@inheritDoc}
53+
*/
54+
@Override
55+
public Collection<FileObjectMeta> listObjects() {
56+
String listingDirectoryPath = getConfig().getSmbDirectoryPath();
57+
58+
LOG.info("Listing SMB files in directory: smb://{}/{}{}",
59+
getConfig().getSmbHost(),
60+
getConfig().getSmbShare(),
61+
listingDirectoryPath);
62+
63+
List<FileObjectMeta> filesMetadata = getSmbClient()
64+
.listFiles(listingDirectoryPath)
65+
.collect(Collectors.toList());
66+
67+
LOG.info("Found {} files in SMB directory before filtering", filesMetadata.size());
68+
69+
Collection<FileObjectMeta> filteredFiles = filter.filterFiles(filesMetadata);
70+
71+
LOG.info("Returning {} files after filtering", filteredFiles.size());
72+
73+
return filteredFiles;
74+
}
75+
76+
/**
77+
* {@inheritDoc}
78+
*/
79+
@Override
80+
public void setFilter(FileListFilter filter) {
81+
this.filter = filter;
82+
}
83+
84+
/**
85+
* {@inheritDoc}
86+
*/
87+
@Override
88+
public SmbFileStorage storage() {
89+
return new SmbFileStorage(config);
90+
}
91+
92+
SmbClient getSmbClient() {
93+
return smbClient;
94+
}
95+
96+
SmbFileSystemListingConfig getConfig() {
97+
return config;
98+
}
99+
}

0 commit comments

Comments
 (0)