|
48 | 48 | import io.dapr.client.domain.ConversationTools; |
49 | 49 | import io.dapr.client.domain.ConversationToolsFunction; |
50 | 50 | import io.dapr.client.domain.DaprMetadata; |
| 51 | +import io.dapr.client.domain.DecryptRequestAlpha1; |
51 | 52 | import io.dapr.client.domain.DeleteJobRequest; |
52 | 53 | import io.dapr.client.domain.DeleteStateRequest; |
53 | 54 | import io.dapr.client.domain.DropFailurePolicy; |
| 55 | +import io.dapr.client.domain.EncryptRequestAlpha1; |
54 | 56 | import io.dapr.client.domain.ExecuteStateTransactionRequest; |
55 | 57 | import io.dapr.client.domain.FailurePolicy; |
56 | 58 | import io.dapr.client.domain.FailurePolicyType; |
@@ -2071,4 +2073,194 @@ private AppConnectionPropertiesHealthMetadata getAppConnectionPropertiesHealth( |
2071 | 2073 | return new AppConnectionPropertiesHealthMetadata(healthCheckPath, healthProbeInterval, healthProbeTimeout, |
2072 | 2074 | healthThreshold); |
2073 | 2075 | } |
| 2076 | + |
| 2077 | + /** |
| 2078 | + * {@inheritDoc} |
| 2079 | + */ |
| 2080 | + @Override |
| 2081 | + public Flux<byte[]> encrypt(EncryptRequestAlpha1 request) { |
| 2082 | + try { |
| 2083 | + if (request == null) { |
| 2084 | + throw new IllegalArgumentException("EncryptRequestAlpha1 cannot be null."); |
| 2085 | + } |
| 2086 | + if (request.getComponentName() == null || request.getComponentName().trim().isEmpty()) { |
| 2087 | + throw new IllegalArgumentException("Component name cannot be null or empty."); |
| 2088 | + } |
| 2089 | + if (request.getKeyName() == null || request.getKeyName().trim().isEmpty()) { |
| 2090 | + throw new IllegalArgumentException("Key name cannot be null or empty."); |
| 2091 | + } |
| 2092 | + if (request.getKeyWrapAlgorithm() == null || request.getKeyWrapAlgorithm().trim().isEmpty()) { |
| 2093 | + throw new IllegalArgumentException("Key wrap algorithm cannot be null or empty."); |
| 2094 | + } |
| 2095 | + if (request.getPlainTextStream() == null) { |
| 2096 | + throw new IllegalArgumentException("Plaintext stream cannot be null."); |
| 2097 | + } |
| 2098 | + |
| 2099 | + return Flux.create(sink -> { |
| 2100 | + // Create response observer to receive encrypted data |
| 2101 | + StreamObserver<DaprProtos.EncryptResponse> responseObserver = new StreamObserver<DaprProtos.EncryptResponse>() { |
| 2102 | + @Override |
| 2103 | + public void onNext(DaprProtos.EncryptResponse response) { |
| 2104 | + if (response.hasPayload()) { |
| 2105 | + byte[] data = response.getPayload().getData().toByteArray(); |
| 2106 | + if (data.length > 0) { |
| 2107 | + sink.next(data); |
| 2108 | + } |
| 2109 | + } |
| 2110 | + } |
| 2111 | + |
| 2112 | + @Override |
| 2113 | + public void onError(Throwable t) { |
| 2114 | + sink.error(DaprException.propagate(new DaprException("ENCRYPT_ERROR", |
| 2115 | + "Error during encryption: " + t.getMessage(), t))); |
| 2116 | + } |
| 2117 | + |
| 2118 | + @Override |
| 2119 | + public void onCompleted() { |
| 2120 | + sink.complete(); |
| 2121 | + } |
| 2122 | + }; |
| 2123 | + |
| 2124 | + // Get the request stream observer from gRPC |
| 2125 | + StreamObserver<DaprProtos.EncryptRequest> requestObserver = |
| 2126 | + intercept(null, asyncStub).encryptAlpha1(responseObserver); |
| 2127 | + |
| 2128 | + // Build options for the first message |
| 2129 | + DaprProtos.EncryptRequestOptions.Builder optionsBuilder = DaprProtos.EncryptRequestOptions.newBuilder() |
| 2130 | + .setComponentName(request.getComponentName()) |
| 2131 | + .setKeyName(request.getKeyName()) |
| 2132 | + .setKeyWrapAlgorithm(request.getKeyWrapAlgorithm()); |
| 2133 | + |
| 2134 | + if (request.getDataEncryptionCipher() != null && !request.getDataEncryptionCipher().isEmpty()) { |
| 2135 | + optionsBuilder.setDataEncryptionCipher(request.getDataEncryptionCipher()); |
| 2136 | + } |
| 2137 | + optionsBuilder.setOmitDecryptionKeyName(request.isOmitDecryptionKeyName()); |
| 2138 | + if (request.getDecryptionKeyName() != null && !request.getDecryptionKeyName().isEmpty()) { |
| 2139 | + optionsBuilder.setDecryptionKeyName(request.getDecryptionKeyName()); |
| 2140 | + } |
| 2141 | + |
| 2142 | + final DaprProtos.EncryptRequestOptions options = optionsBuilder.build(); |
| 2143 | + final long[] sequenceNumber = {0}; |
| 2144 | + final boolean[] firstMessage = {true}; |
| 2145 | + |
| 2146 | + // Subscribe to the plaintext stream and send chunks |
| 2147 | + request.getPlainTextStream() |
| 2148 | + .doOnNext(chunk -> { |
| 2149 | + DaprProtos.EncryptRequest.Builder reqBuilder = DaprProtos.EncryptRequest.newBuilder() |
| 2150 | + .setPayload(CommonProtos.StreamPayload.newBuilder() |
| 2151 | + .setData(ByteString.copyFrom(chunk)) |
| 2152 | + .setSeq(sequenceNumber[0]++) |
| 2153 | + .build()); |
| 2154 | + |
| 2155 | + // Include options only in the first message |
| 2156 | + if (firstMessage[0]) { |
| 2157 | + reqBuilder.setOptions(options); |
| 2158 | + firstMessage[0] = false; |
| 2159 | + } |
| 2160 | + |
| 2161 | + requestObserver.onNext(reqBuilder.build()); |
| 2162 | + }) |
| 2163 | + .doOnError(error -> { |
| 2164 | + requestObserver.onError(error); |
| 2165 | + sink.error(DaprException.propagate(new DaprException("ENCRYPT_ERROR", |
| 2166 | + "Error reading plaintext stream: " + error.getMessage(), error))); |
| 2167 | + }) |
| 2168 | + .doOnComplete(() -> { |
| 2169 | + requestObserver.onCompleted(); |
| 2170 | + }) |
| 2171 | + .subscribe(); |
| 2172 | + }); |
| 2173 | + } catch (Exception ex) { |
| 2174 | + return DaprException.wrapFlux(ex); |
| 2175 | + } |
| 2176 | + } |
| 2177 | + |
| 2178 | + /** |
| 2179 | + * {@inheritDoc} |
| 2180 | + */ |
| 2181 | + @Override |
| 2182 | + public Flux<byte[]> decrypt(DecryptRequestAlpha1 request) { |
| 2183 | + try { |
| 2184 | + if (request == null) { |
| 2185 | + throw new IllegalArgumentException("DecryptRequestAlpha1 cannot be null."); |
| 2186 | + } |
| 2187 | + if (request.getComponentName() == null || request.getComponentName().trim().isEmpty()) { |
| 2188 | + throw new IllegalArgumentException("Component name cannot be null or empty."); |
| 2189 | + } |
| 2190 | + if (request.getCipherTextStream() == null) { |
| 2191 | + throw new IllegalArgumentException("Ciphertext stream cannot be null."); |
| 2192 | + } |
| 2193 | + |
| 2194 | + return Flux.create(sink -> { |
| 2195 | + // Create response observer to receive decrypted data |
| 2196 | + StreamObserver<DaprProtos.DecryptResponse> responseObserver = new StreamObserver<DaprProtos.DecryptResponse>() { |
| 2197 | + @Override |
| 2198 | + public void onNext(DaprProtos.DecryptResponse response) { |
| 2199 | + if (response.hasPayload()) { |
| 2200 | + byte[] data = response.getPayload().getData().toByteArray(); |
| 2201 | + if (data.length > 0) { |
| 2202 | + sink.next(data); |
| 2203 | + } |
| 2204 | + } |
| 2205 | + } |
| 2206 | + |
| 2207 | + @Override |
| 2208 | + public void onError(Throwable t) { |
| 2209 | + sink.error(DaprException.propagate(new DaprException("DECRYPT_ERROR", |
| 2210 | + "Error during decryption: " + t.getMessage(), t))); |
| 2211 | + } |
| 2212 | + |
| 2213 | + @Override |
| 2214 | + public void onCompleted() { |
| 2215 | + sink.complete(); |
| 2216 | + } |
| 2217 | + }; |
| 2218 | + |
| 2219 | + // Get the request stream observer from gRPC |
| 2220 | + StreamObserver<DaprProtos.DecryptRequest> requestObserver = |
| 2221 | + intercept(null, asyncStub).decryptAlpha1(responseObserver); |
| 2222 | + |
| 2223 | + // Build options for the first message |
| 2224 | + DaprProtos.DecryptRequestOptions.Builder optionsBuilder = DaprProtos.DecryptRequestOptions.newBuilder() |
| 2225 | + .setComponentName(request.getComponentName()); |
| 2226 | + |
| 2227 | + if (request.getKeyName() != null && !request.getKeyName().isEmpty()) { |
| 2228 | + optionsBuilder.setKeyName(request.getKeyName()); |
| 2229 | + } |
| 2230 | + |
| 2231 | + final DaprProtos.DecryptRequestOptions options = optionsBuilder.build(); |
| 2232 | + final long[] sequenceNumber = {0}; |
| 2233 | + final boolean[] firstMessage = {true}; |
| 2234 | + |
| 2235 | + // Subscribe to the ciphertext stream and send chunks |
| 2236 | + request.getCipherTextStream() |
| 2237 | + .doOnNext(chunk -> { |
| 2238 | + DaprProtos.DecryptRequest.Builder reqBuilder = DaprProtos.DecryptRequest.newBuilder() |
| 2239 | + .setPayload(CommonProtos.StreamPayload.newBuilder() |
| 2240 | + .setData(ByteString.copyFrom(chunk)) |
| 2241 | + .setSeq(sequenceNumber[0]++) |
| 2242 | + .build()); |
| 2243 | + |
| 2244 | + // Include options only in the first message |
| 2245 | + if (firstMessage[0]) { |
| 2246 | + reqBuilder.setOptions(options); |
| 2247 | + firstMessage[0] = false; |
| 2248 | + } |
| 2249 | + |
| 2250 | + requestObserver.onNext(reqBuilder.build()); |
| 2251 | + }) |
| 2252 | + .doOnError(error -> { |
| 2253 | + requestObserver.onError(error); |
| 2254 | + sink.error(DaprException.propagate(new DaprException("DECRYPT_ERROR", |
| 2255 | + "Error reading ciphertext stream: " + error.getMessage(), error))); |
| 2256 | + }) |
| 2257 | + .doOnComplete(() -> { |
| 2258 | + requestObserver.onCompleted(); |
| 2259 | + }) |
| 2260 | + .subscribe(); |
| 2261 | + }); |
| 2262 | + } catch (Exception ex) { |
| 2263 | + return DaprException.wrapFlux(ex); |
| 2264 | + } |
| 2265 | + } |
2074 | 2266 | } |
0 commit comments