Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ concurrency:

jobs:
earthly:
if: true
name: Earthly ci
runs-on: ubuntu-latest
permissions: write-all
Expand Down Expand Up @@ -73,10 +74,10 @@ jobs:

- name: Setup FDB
run:
curl -L "https://github.com/apple/foundationdb/releases/download/7.1.31/foundationdb-clients_7.1.31-1_amd64.deb" --output /tmp/foundationdb-clients_7.1.31-1_amd64.deb --fail;
sudo dpkg -i /tmp/foundationdb-clients_7.1.31-1_amd64.deb;
sudo rm -f /tmp/foundationdb-clients_7.1.31-1_amd64.deb;
sudo curl --fail -L "https://github.com/apple/foundationdb/releases/download/7.1.31/libfdb_c.x86_64.so" --output "/usr/lib/libfdb_c.7.1.31.x86_64.so";
curl -L "https://github.com/apple/foundationdb/releases/download/7.3.57/foundationdb-clients_7.3.57-1_amd64.deb" --output /tmp/foundationdb-clients_7.3.57-1_amd64.deb --fail;
sudo dpkg -i /tmp/foundationdb-clients_7.3.57-1_amd64.deb;
sudo rm -f /tmp/foundationdb-clients_7.3.57-1_amd64.deb;
sudo curl --fail -L "https://github.com/apple/foundationdb/releases/download/7.3.57/libfdb_c.x86_64.so" --output "/usr/lib/libfdb_c.7.3.57.x86_64.so";

- name: Setup docker compose
id: compose
Expand Down
11 changes: 10 additions & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,13 @@ $ JAVA_OPTS="-DFDB_LIBRARY_PATH_FDB_C=/usr/local/lib/libfdb_c.dylib -DFDB_LIBRAR
(.invoke method com.apple.foundationdb.JNIUtil (object-array ["fdb_java"]))
(.invoke method com.apple.foundationdb.JNIUtil (object-array ["fdb_c"])))

```
```

# Telemetry

# Get the OT javaagent

```shell
wget --content-disposition https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v2.21.0/opentelemetry-javaagent.jar
```
Run with the `dev` profile to activate the java agent.
7 changes: 3 additions & 4 deletions Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
CACHE ~/.m2
RUN clj -Stree
RUN npm install
#RUN wget https://github.com/apple/foundationdb/releases/download/7.3.63/foundationdb-clients_7.3.63-1_aarch64.deb
#RUN dpkg -i foundationdb-clients_7.3.63-1_aarch64.deb
RUN wget -q https://github.com/apple/foundationdb/releases/download/7.1.31/foundationdb-clients_7.1.31-1_amd64.deb
RUN dpkg -i foundationdb-clients_7.1.31-1_amd64.deb
RUN wget -nv https://github.com/apple/foundationdb/releases/download/7.3.57/foundationdb-clients_7.3.57-1_amd64.deb
RUN dpkg -i foundationdb-clients_7.3.57-1_amd64.deb
RUN echo "docker:docker@127.0.0.1:4500" > /etc/foundationdb/fdb.cluster
RUN wget -nv --content-disposition https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

build-base:
FROM +deps
Expand Down Expand Up @@ -66,7 +65,7 @@
COPY docker ./docker
COPY docker-compose.yaml ./
WITH DOCKER --compose docker-compose.yaml
RUN bin/run-coverage

Check failure on line 68 in Earthfile

View workflow job for this annotation

GitHub Actions / Earthly ci

Error

The command WITH DOCKER RUN --privileged bin/run-coverage did not complete successfully. Exit code 1
END
SAVE ARTIFACT ./coverage AS LOCAL ./coverage
SAVE ARTIFACT ./target AS LOCAL ./target
Expand Down
2 changes: 1 addition & 1 deletion bin/kaocha
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[ -d "node_modules/ws" ] || npm install ws

JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 -Dio.netty.tryUnsafe=false"
JAVA_OPTS="${JAVA_OPTS:--agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 -Dio.netty.tryUnsafe=false}"

if [[ "$OSTYPE" == "darwin"* ]]; then
FDB_OPTS="-J-DFDB_LIBRARY_PATH_FDB_C=/usr/local/lib/libfdb_c.dylib -J-DFDB_LIBRARY_PATH_FDB_JAVA=/usr/local/lib/libfdb_java.jnilib"
Expand Down
15 changes: 0 additions & 15 deletions build/build.clj
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
(ns build
(:require [clojure.tools.build.api :as b]
[clojure.pprint :as pprint]))

(def base-nses ['intemporal.workflow
'intemporal.store.internal
'intemporal.store.foundationdb
'intemporal.store
'intemporal.macros
'intemporal.workflow.internal
'intemporal.store.jdbc])

(def dev-nses ['intemporal.demo-parallelism
'intemporal.demo-recovery
'intemporal.demo-saga
'intemporal.demo-vthread-recovery
'intemporal.demo-workflow])

;; clj -T:build compile-main
(defn compile-main [opts]
(b/delete {:path "target/classes"})
Expand Down
34 changes: 23 additions & 11 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
{:paths ["src" "target"]
:test-paths ["test"]

:deps {org.clojure/clojure {:mvn/version "1.12.1"}
thheller/shadow-cljs {:mvn/version "2.28.11"}
com.taoensso/telemere {:mvn/version "1.1.0"}
com.taoensso/nippy {:mvn/version "3.6.0"}
:deps {org.clojure/clojure {:mvn/version "1.12.1"}
thheller/shadow-cljs {:mvn/version "2.28.11"}
com.taoensso/telemere {:mvn/version "1.1.0"}
com.taoensso/nippy {:mvn/version "3.6.0"}
com.github.steffan-westcott/clj-otel-api {:mvn/version "0.2.10"}
;;stuff
net.cgrand/macrovich {:mvn/version "0.2.2"}
funcool/promesa {:mvn/version "11.0.678"}
metosin/malli {:mvn/version "0.19.1"}
thedavidmeister/cljc-md5 {:mvn/version "0.0.2"}
missionary/missionary {:mvn/version "b.46"}}
net.cgrand/macrovich {:mvn/version "0.2.2"}
funcool/promesa {:mvn/version "11.0.678"}
metosin/malli {:mvn/version "0.19.1"}
thedavidmeister/cljc-md5 {:mvn/version "0.0.2"}
missionary/missionary {:mvn/version "b.46"}}

:aliases {:dev {:extra-paths ["dev" "test"]
:jvm-opts ["-Djdk.attach.allowAttachSelf"]
:_jvm-opts ["-Djdk.attach.allowAttachSelf"
"-javaagent:./opentelemetry-javaagent.jar"
"-Dotel.instrumentation.common.default-enabled=true"
"-Dotel.javaagent.debug=false"
"-Dotel.exporter.otlp.protocol=grpc"
"-Dotel.exporter.otlp.endpoint=http://localhost:4317"
"-Dotel.instrumentation.netty.enabled=false"
"-Dotel.metrics.exporter=none"
"-Dotel.logs.exporter=none"
"-Dotel.javaagent.debug=false"
"-Dotel.resource.attributes=service.name=intemporal"]

:extra-deps {exoscale/automata {:mvn/version "0.1.10"}
lambdaisland/kaocha {:mvn/version "1.91.1392"}
lambdaisland/kaocha-cloverage {:mvn/version "1.1.89"}
Expand All @@ -24,11 +35,12 @@
tortue/spy {:mvn/version "2.15.0"}
nubank/matcher-combinators {:mvn/version "3.9.2"}
com.clojure-goes-fast/clj-async-profiler {:mvn/version "1.6.2"}}}
:fdb {:extra-deps {org.foundationdb/fdb-java {:mvn/version "7.1.60"}
:fdb {:extra-deps {org.foundationdb/fdb-java {:mvn/version "7.3.62"}
me.vedang/clj-fdb {:mvn/version "0.3.0"}}}

:jdbc {:extra-deps {com.github.seancorfield/next.jdbc {:mvn/version "1.3.1048"}
org.postgresql/postgresql {:mvn/version "42.7.7"}
hikari-cp/hikari-cp {:mvn/version "3.3.0"}
migratus/migratus {:mvn/version "1.6.4"}}}
:doc {:extra-paths ["doc"]}
:cljs {:extra-deps {org.clojure/clojurescript {:mvn/version "1.12.42"}
Expand Down
5 changes: 2 additions & 3 deletions dev/intemporal/demo_parallelism.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
(ns intemporal.demo-parallelism
(:require [intemporal.store :as store]
[intemporal.workflow :as w]
[promesa.core :as p])
(:require [intemporal.macros :refer [stub-protocol defn-workflow vthread]]
[intemporal.workflow]))
[intemporal.macros :refer [stub-protocol defn-workflow vthread]]
[promesa.core :as p]))

;;;;
;; demo
Expand Down
6 changes: 5 additions & 1 deletion dev/intemporal/vthread-recovery.edn
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
:result nil,
:id "silly-mcclintock",
:order 1,
:lease-end nil}},
:lease-end nil
:runtime {:timeout-ms 900000,
:telemetry-context {"traceparent" "00-0f17ae74e434659106c06f591ee56a6d-85f973ca22cd0d92-01"}}}},
:history {"silly-mcclintock" [{:ref "silly-mcclintock",
:root "silly-mcclintock",
:type :intemporal.workflow/invoke,
Expand All @@ -16,6 +18,8 @@
:error nil,
:result nil,
:id 17}
cccccbrfjtefettfeikinrkevrueftfcuttfningduuc

#_
{:ref "silly-mcclintock",
:root "silly-mcclintock",
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ services:
- jaeger

foundation:
image: "foundationdb/foundationdb:7.1.60"
image: "foundationdb/foundationdb:7.3.62"
environment:
FDB_NETWORKING_MODE: host
entrypoint: ["/usr/bin/tini", "-g", "--", "sh", "/fdb-init.bash"]
Expand Down
2 changes: 1 addition & 1 deletion docker/fdb.cluster
Original file line number Diff line number Diff line change
@@ -1 +1 @@
docker:docker@192.168.107.2:4500
docker:docker@192.168.107.3:4500
4 changes: 2 additions & 2 deletions resources/migrations/postgres/20240326161343-initial.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE TABLE IF NOT EXISTS tasks (
type varchar(50) NOT NULL,
ref varchar(50) NULL,
root varchar(50) NULL,
sym varchar(100) NOT NULL,
sym varchar(200) NOT NULL,
args bytea NULL,
result bytea NULL,
state varchar(20) NOT NULL,
Expand All @@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS events (
type varchar(50) NOT NULL,
ref varchar(50) NULL, --NOT NULL,
root varchar(50) NOT NULL,
sym varchar(50) NOT NULL,
sym varchar(200) NOT NULL,
args bytea NULL,
result bytea NULL,
--FOREIGN KEY (ref) REFERENCES tasks(id) on delete set null,
Expand Down
13 changes: 12 additions & 1 deletion src/intemporal/error.cljc
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
(ns intemporal.error)
(ns intemporal.error
#?(:clj (:import [java.lang InterruptedException]
[java.util.concurrent RejectedExecutionException])))


(defn interrupted? [e]
#?(:clj (instance? InterruptedException e)
:cljs false))

(defn rejected? [e]
#?(:clj (instance? RejectedExecutionException e)
:cljs false))

(defn internal-error? [ex]
(when-let [t (-> ex ex-data ::type)]
Expand Down
32 changes: 18 additions & 14 deletions src/intemporal/macros.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@
;; an embedded workflow engine doesn't need to have a task per invocation
(t/log! {:level :debug :_data {:env i/*env* :task task#}} ["Invoking task with id " id#])
(trace! {:name (format "activity: %s" (symbol fvar#)) :attributes {:task-id id#}}
(let [res# (i/resume-task i/*env* store# protos# task#)]
(macros/case
:cljs res#
:clj (deref res#))))))))
;(w/enqueue-and-wait i/*env* task#)))))
(w/enqueue-and-wait i/*env* task#)
#_(let [res# (i/resume-task i/*env* store# protos# task#)]
(macros/case
:cljs res#
:clj (deref res#))))))))

(defmacro stub-protocol
"Stub a protocol definition. Opts are currently unused.
Expand Down Expand Up @@ -159,8 +159,8 @@
id#)]

(t/log! {:level :debug :_data {:env i/*env* :task task#}} ["Invoking task with id" id#])
(i/resume-task i/*env* store# protos# task#))))))))
;(w/enqueue-and-wait i/*env* task#))))))))
;(i/resume-task i/*env* store# protos# task#))))))))
(w/enqueue-and-wait i/*env* task#))))))))

:clj
#_{:clj-kondo/ignore [:unresolved-symbol]}
Expand Down Expand Up @@ -203,21 +203,25 @@

(t/log! {:level :debug :_data {:env i/*env* :task task#}} ["Invoking task with id" id#])
(if (:vthread? i/*env*)

(trace-async! {:name (format "activity: %s" aid#) :attributes {:task-id id# :protocol (-> ~proto :var symbol)}}
@(i/resume-task i/*env* store# protos# task#))
#_
(trace! {:name (format "activity: %s" aid#) :attributes {:task-id id# :protocol (-> ~proto :var symbol)}}
(w/enqueue-and-wait i/*env* task#))
(trace! {:name (format "activity: %s" aid#) :attributes {:task-id id# :protocol (-> ~proto :var symbol)}}
@(i/resume-task i/*env* store# protos# task#)))))))))))
;(w/enqueue-and-wait i/*env* task#)))))))))
;@(i/resume-task i/*env* store# protos# task#)))))))))))
(w/enqueue-and-wait i/*env* task#)))))))))))

(defmacro with-failure
"Runs `fcall`, ensuring that if it fails, compensation will always run.
- if `fcall` fails, `binding` will have the value `intemporal.activity/failure`.
- if `fcall` succeeds, but compensation is invoked later (eg other activity failure), `binding` will have its return value
"Runs `body`, ensuring that if it fails, compensation will always run.
- if `body` fails, `binding` will have the value `intemporal.activity/failure`.
- if `body` succeeds, but compensation is invoked later (eg other activity failure), `binding` will have its return value

(with-failure [v (book-hotel stub \"hotel\")]
(cancel-hotel stub v n))
"
[[binding fcall] comp-fn]
[[binding body] comp-fn]
`(let [val# (atom :intemporal.activity/failure)]
(w/add-compensation (fn [] (let [~binding @val#] (do ~comp-fn))))
(reset! val# (do ~fcall))))
(reset! val# (do ~body))))
14 changes: 8 additions & 6 deletions src/intemporal/store.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,11 @@
find-task (fn [this id]
(get @tasks id))

update-task (fn [this id & kvs]
update-task (fn [this id attrs]
(when-let [w (find-task this id)]
(maybe-fail!)
(->> (apply assoc w kvs)
(si/validate-transition! w attrs)
(->> (merge w attrs)
(si/validate-task!)
(swap! tasks assoc id))))]

Expand Down Expand Up @@ -191,7 +192,7 @@
(vals @tasks)))

(task<-panic [this task-id error]
(update-task this task-id :result error))
(update-task this task-id {:result error}))

(task<-event [this task-id {:keys [id ref root type sym args result error] :as event-descr}]
;; some redundancy between :result in task and event
Expand All @@ -202,22 +203,22 @@
(let [evt {:ref ref :root root :type type :sym sym :args args :error nil :result nil}]
(when-not id
(save-event this task-id evt))
(update-task this task-id :state :pending)
(update-task this task-id {:state :pending})
evt)

(some? error)
(let [evt {:ref ref :root root :type type :sym sym :args nil :error error :result nil}]
(when-not id
(save-event this task-id evt))
(update-task this task-id :state :failure :result error)
(update-task this task-id {:state :failure :result error})
evt)

;;(some? result) ;result can be nil
:else
(let [evt {:ref ref :root root :type type :sym sym :args nil :error nil :result result}]
(when-not id
(save-event this task-id evt))
(update-task this task-id :state :success :result result)
(update-task this task-id {:state :success :result result})
evt)))

(find-task [this id]
Expand Down Expand Up @@ -288,6 +289,7 @@
(try
;; ensure we only run f once - swap! might run the fn multiple times
(assoc task :state :new :owner owner)
;; TODO log reenqueued task
(finally
(when-not (contains? @task->run? task)
(try
Expand Down
1 change: 1 addition & 0 deletions src/intemporal/store/foundationdb.clj
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
;; not every invocation will come from a persisted task
(when task
(si/validate-task! updated-task)
(si/validate-transition! task updated-task)
(fc/set tx subspace-owned-tasks task-id (serialize updated-task)))
updated-evt)))

Expand Down
19 changes: 19 additions & 0 deletions src/intemporal/store/internal.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,24 @@
[:result {:optional true} :any]
[:error {:optional true} :any]])

;; valid task states
(def valid-state-transitions {:new #{:pending}
:pending #{:new :success :failure}})

(defn validate-transition!
"Ensures that the task's new `:state`, if any, is allowed.
Useful to implement compare-and-swap semantics"
[{:keys [state id]} attrs]
(let [next-states (get valid-state-transitions state)]
;; if we are updating state
;; and the new state is not allowed
;; error out
(when (and (contains? attrs :state)
(not= (:state attrs) state)
(not (contains? next-states (:state attrs))))
(throw (ex-info (str "Cannot update task with id " id " from state " state " to " (:state attrs)) {:task-id id
:state state
:next-state (:state attrs)})))))
(def validate-task!
"Throws if the task is not valid"
(m/coercer Task nil {:registry registry}))
Expand All @@ -107,6 +125,7 @@
(when-not (serializable? obj)
(throw (ex-info msg {:object obj})))))


(defn success? [{:keys [state] :as task}]
(= :success state))

Expand Down
Loading
Loading