0
\$\begingroup\$

I have a task which runs sequentially. If I have hundreds of thousands of tags, it is very time consuming. Can this be modified to run the task in parallel? Works fine upto 100 tags

 (:require [content :as db-content]           [impl.content-tag :as db-content-tag]           [story :as db-story]           [tag :as db-tag]           [transaction :as transaction]           [log :as log]           [config :as config]))(defn- find-story-ids-by-tag [tag-id] (map :id (db-content-tag/read-contents-by-tag-id (config/db-spec) tag-id)))(defn- update-story [txn publisher-id story-id tag-id] (let [{:keys [published-json]} (db-content/find-by-id txn story-id)       updated-tags (filter (fn [tag] (not= (:id tag) tag-id)) (:tags published-json))       _ (db-story/update-published-json-without-timestamps txn publisher-id story-id (assoc published-json :tags updated-tags))]   (log/info {:message "[TAG-DELETION] Updated Story Tags in Published JSON"              :publisher-id publisher-id              :tag-id tag-id              :story-id story-id              :updated-tags-json updated-tags})))(defn- delete-tag [publisher-id tag-id] (let [associated-content-ids (find-story-ids-by-tag tag-id)]   (transaction/with-transaction     [txn (config/db-spec)]     (do       (when (seq associated-content-ids)         (do           (doseq [story-id associated-content-ids]             (update-story txn publisher-id story-id tag-id))           (db-content-tag/delete-batch-by-tag txn tag-id associated-content-ids)           (log/info {:message "[TAG-DELETION] Deleted from Content Tag"                      :publisher-id publisher-id                      :tag-id tag-id                      :story-ids associated-content-ids})))       (db-tag/delete txn publisher-id tag-id)       (log/info {:message "[TAG-DELETION] Deleted Tag"                  :publisher-id publisher-id                  :tag-id tag-id})))))(defn run [publisher-id tag-ids] (comment run 123 [4 5 6]) (try   (do     (log/info {:message "[TAG-DELETION] started"                :publisher-id publisher-id                :tag-ids tag-ids})     (doseq [tag-id tag-ids] (if (db-tag/find-by-id (config/db-spec) publisher-id tag-id)                               (delete-tag publisher-id tag-id)                               (log/info {:message "[TAG-DELETION] Tag Not Found"                                          :publisher-id publisher-id                                          :tag-id tag-id})))     (log/info {:message "[TAG-DELETION] completed"                :publisher-id publisher-id                :tag-ids tag-ids}))   (catch Exception e     (log/exception e {:message "[TAG-DELETION] errored"                       :publisher-id publisher-id                       :tag-ids tag-ids}))))```
pacmaninbw's user avatar
pacmaninbw
26.2k13 gold badges47 silver badges114 bronze badges
askedJun 5, 2023 at 22:49
Bindiya H's user avatar
\$\endgroup\$
2
  • \$\begingroup\$Welcome to Code Review! The current question title, which states your concerns about the code, is too general to be useful here. Pleaseedit to the site standard, which is for the title to simply statethe task accomplished by the code. Please seeHow to get the best value out of Code Review: Asking Questions for guidance on writing good question titles.\$\endgroup\$CommentedJun 6, 2023 at 15:07
  • \$\begingroup\$Do you expect a speed-up factor into the thousands fromrunning the task in parallel?\$\endgroup\$CommentedJun 7, 2023 at 7:35

1 Answer1

2
\$\begingroup\$

You want to usethe Claypoole library. Sample code:

(ns tst.demo.core  (:use tupelo.core tupelo.test)  (:require    [com.climate.claypoole :as pool]    ))(defn task [] (Thread/sleep 10))(defn slow  []  (newline) (prn :slow)  (doseq [i (range 100)]    (task)))(defn fast  []  (newline) (prn :fast)  (pool/pdoseq 8 [i (range 100)]    (task)))(verify  (time (slow))  (time (fast)))

with results:

time (clojure -X:test)Running tests in #{"test"}Testing tst._bootstrap------------------------------------------   Clojure 1.12.0-alpha3    Java 20.0.1------------------------------------------Testing tst.demo.core:slow"Elapsed time: 1012.908212 msecs":fast"Elapsed time: 137.932203 msecs"Ran 2 tests containing 0 assertions.0 failures, 0 errors.  36.50s user 0.85s system 284% cpu 13.150 total

So we see an 8x speedup by using 1 thread on each cpu core (i.e. the8 afterpdoseq). For I/O dominated workloads, you could benefit by using many more threads (eg 64).

Built usingmy favorite template project.

answeredJun 6, 2023 at 1:29
Alan Thompson's user avatar
\$\endgroup\$

You mustlog in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.