初始提交: UE5.3项目基础框架

This commit is contained in:
2025-10-14 11:14:54 +08:00
commit 721d9fd98e
5334 changed files with 316782 additions and 0 deletions

View File

@ -0,0 +1,360 @@
#pragma once
#include "Future.h"
#include "Impl/ContinuationFutureType.h"
#include "Impl/RemoveFuture.h"
#include "Impl/WithTracing.h"
#include "Impl/cesium-async++.h"
#include "Library.h"
#include "Promise.h"
#include "ThreadPool.h"
#include <CesiumUtility/Tracing.h>
#include <memory>
#include <type_traits>
namespace CesiumAsync {
class ITaskProcessor;
class AsyncSystem;
/**
* @brief A system for managing asynchronous requests and tasks.
*
* Instances of this class may be safely and efficiently stored and passed
* around by value. However, it is essential that the _last_ AsyncSystem
* instance be destroyed only after all continuations have run to completion.
* Otherwise, continuations may be scheduled using invalid scheduler instances,
* leading to a crash. Broadly, there are two ways to achieve this:
*
* * Wait until all Futures complete before destroying the "owner" of the
* AsyncSystem.
* * Make the AsyncSystem a global or static local in order to extend its
* lifetime all the way until program termination.
*/
class CESIUMASYNC_API AsyncSystem final {
public:
/**
* @brief Constructs a new instance.
*
* @param pTaskProcessor The interface used to run tasks in background
* threads.
*/
AsyncSystem(const std::shared_ptr<ITaskProcessor>& pTaskProcessor) noexcept;
/**
* @brief Creates a new Future by immediately invoking a function and giving
* it the opportunity to resolve or reject a {@link Promise}.
*
* The {@link Promise} passed to the callback `f` may be resolved or rejected
* asynchronously, even after the function has returned.
*
* This method is very similar to {@link AsyncSystem::createPromise}, except
* that that method returns the Promise directly. The advantage of using this
* method instead is that it is more exception-safe. If the callback `f`
* throws an exception, the `Future` will be rejected automatically and the
* exception will not escape the callback.
*
* @tparam T The type that the Future resolves to.
* @tparam Func The type of the callback function.
* @param f The callback function to invoke immediately to create the Future.
* @return A Future that will resolve when the callback function resolves the
* supplied Promise.
*/
template <typename T, typename Func> Future<T> createFuture(Func&& f) const {
std::shared_ptr<async::event_task<T>> pEvent =
std::make_shared<async::event_task<T>>();
Promise<T> promise(this->_pSchedulers, pEvent);
try {
f(promise);
} catch (...) {
promise.reject(std::current_exception());
}
return Future<T>(this->_pSchedulers, pEvent->get_task());
}
/**
* @brief Create a Promise that can be used at a later time to resolve or
* reject a Future.
*
* Use {@link Promise<T>::getFuture} to get the Future that is resolved
* or rejected when this Promise is resolved or rejected.
*
* Consider using {@link AsyncSystem::createFuture} instead of this method.
*
* @tparam T The type that is provided when resolving the Promise and the type
* that the associated Future resolves to. Future.
* @return The Promise.
*/
template <typename T> Promise<T> createPromise() const {
return Promise<T>(
this->_pSchedulers,
std::make_shared<async::event_task<T>>());
}
/**
* @brief Runs a function in a worker thread, returning a Future that
* resolves when the function completes.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* If this method is called from a designated worker thread, the
* callback will be invoked immediately and complete before this function
* returns.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, void>
runInWorkerThread(Func&& f) const {
static const char* tracingName = "waiting for worker thread";
CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
return CesiumImpl::ContinuationFutureType_t<Func, void>(
this->_pSchedulers,
async::spawn(
this->_pSchedulers->workerThread.immediate,
CesiumImpl::WithTracing<void>::end(
tracingName,
std::forward<Func>(f))));
}
/**
* @brief Runs a function in the main thread, returning a Future that
* resolves when the function completes.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* If this method is called from the main thread, the callback will be invoked
* immediately and complete before this function returns.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, void>
runInMainThread(Func&& f) const {
static const char* tracingName = "waiting for main thread";
CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
return CesiumImpl::ContinuationFutureType_t<Func, void>(
this->_pSchedulers,
async::spawn(
this->_pSchedulers->mainThread.immediate,
CesiumImpl::WithTracing<void>::end(
tracingName,
std::forward<Func>(f))));
}
/**
* @brief Runs a function in a thread pool, returning a Future that resolves
* when the function completes.
*
* @tparam Func The type of the function.
* @param threadPool The thread pool in which to run the function.
* @param f The function to run.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, void>
runInThreadPool(const ThreadPool& threadPool, Func&& f) const {
static const char* tracingName = "waiting for thread pool";
CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
return CesiumImpl::ContinuationFutureType_t<Func, void>(
this->_pSchedulers,
async::spawn(
threadPool._pScheduler->immediate,
CesiumImpl::WithTracing<void>::end(
tracingName,
std::forward<Func>(f))));
}
/**
* @brief The value type of the Future returned by {@link all}.
*
* This will be either `std::vector<T>`, if the input Futures passed to the
* `all` function return values, or `void` if they do not.
*
* @tparam T The value type of the input Futures passed to the function.
*/
template <typename T>
using AllValueType =
std::conditional_t<std::is_void_v<T>, void, std::vector<T>>;
/**
* @brief Creates a Future that resolves when every Future in a vector
* resolves, and rejects when any Future in the vector rejects.
*
* If the input Futures resolve to non-void values, the returned Future
* resolves to a vector of the values, in the same order as the input Futures.
* If the input Futures resolve to void, the returned Future resolves to void
* as well.
*
* If any of the Futures rejects, the returned Future rejects as well. The
* exception included in the rejection will be from the first Future in the
* vector that rejects.
*
* To get detailed rejection information from each of the Futures,
* attach a `catchInMainThread` continuation prior to passing the
* list into `all`.
*
* @tparam T The type that each Future resolves to.
* @param futures The list of futures.
* @return A Future that resolves when all the given Futures resolve, and
* rejects when any Future in the vector rejects.
*/
template <typename T>
Future<AllValueType<T>> all(std::vector<Future<T>>&& futures) const {
return this->all<T, Future<T>>(
std::forward<std::vector<Future<T>>>(futures));
}
/**
* @brief Creates a Future that resolves when every Future in a vector
* resolves, and rejects when any Future in the vector rejects.
*
* If the input SharedFutures resolve to non-void values, the returned Future
* resolves to a vector of the values, in the same order as the input
* SharedFutures. If the input SharedFutures resolve to void, the returned
* Future resolves to void as well.
*
* If any of the SharedFutures rejects, the returned Future rejects as well.
* The exception included in the rejection will be from the first SharedFuture
* in the vector that rejects.
*
* To get detailed rejection information from each of the SharedFutures,
* attach a `catchInMainThread` continuation prior to passing the
* list into `all`.
*
* @tparam T The type that each SharedFuture resolves to.
* @param futures The list of shared futures.
* @return A Future that resolves when all the given SharedFutures resolve,
* and rejects when any SharedFuture in the vector rejects.
*/
template <typename T>
Future<AllValueType<T>> all(std::vector<SharedFuture<T>>&& futures) const {
return this->all<T, SharedFuture<T>>(
std::forward<std::vector<SharedFuture<T>>>(futures));
}
/**
* @brief Creates a future that is already resolved.
*
* @tparam T The type of the future.
* @param value The value for the future.
* @return The future.
*/
template <typename T> Future<T> createResolvedFuture(T&& value) const {
return Future<T>(
this->_pSchedulers,
async::make_task<T>(std::forward<T>(value)));
}
/**
* @brief Creates a future that is already resolved and resolves to no value.
*
* @return The future.
*/
Future<void> createResolvedFuture() const {
return Future<void>(this->_pSchedulers, async::make_task());
}
/**
* @brief Runs all tasks that are currently queued for the main thread.
*
* The tasks are run in the calling thread.
*/
void dispatchMainThreadTasks();
/**
* @brief Runs a single waiting task that is currently queued for the main
* thread. If there are no tasks waiting, it returns immediately without
* running any tasks.
*
* The task is run in the calling thread.
*
* @return true A single task was executed.
* @return false No task was executed because none are waiting.
*/
bool dispatchOneMainThreadTask();
/**
* @brief Creates a new thread pool that can be used to run continuations.
*
* @param numberOfThreads The number of threads in the pool.
* @return The thread pool.
*/
ThreadPool createThreadPool(int32_t numberOfThreads) const;
/**
* Returns true if this instance and the right-hand side can be used
* interchangeably because they schedule continuations identically. Otherwise,
* returns false.
*/
bool operator==(const AsyncSystem& rhs) const noexcept;
/**
* Returns true if this instance and the right-hand side can _not_ be used
* interchangeably because they schedule continuations differently. Otherwise,
* returns false.
*/
bool operator!=(const AsyncSystem& rhs) const noexcept;
private:
// Common implementation of 'all' for both Future and SharedFuture.
template <typename T, typename TFutureType>
Future<AllValueType<T>> all(std::vector<TFutureType>&& futures) const {
using TTaskType = decltype(TFutureType::_task);
std::vector<TTaskType> tasks;
tasks.reserve(futures.size());
for (auto it = futures.begin(); it != futures.end(); ++it) {
tasks.emplace_back(std::move(it->_task));
}
futures.clear();
async::task<AllValueType<T>> task =
async::when_all(tasks.begin(), tasks.end())
.then(
async::inline_scheduler(),
[](std::vector<TTaskType>&& tasks) {
if constexpr (std::is_void_v<T>) {
// Tasks return void. "Get" each task so that error
// information is propagated.
for (auto it = tasks.begin(); it != tasks.end(); ++it) {
it->get();
}
} else {
// Get all the results. If any tasks rejected, we'll bail
// with an exception.
std::vector<T> results;
results.reserve(tasks.size());
for (auto it = tasks.begin(); it != tasks.end(); ++it) {
results.emplace_back(std::move(it->get()));
}
return results;
}
});
return Future<AllValueType<T>>(this->_pSchedulers, std::move(task));
}
std::shared_ptr<CesiumImpl::AsyncSystemSchedulers> _pSchedulers;
template <typename T> friend class Future;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,119 @@
#pragma once
#include "HttpHeaders.h"
#include "Library.h"
#include <cstddef>
#include <cstdint>
#include <ctime>
#include <map>
#include <span>
#include <vector>
namespace CesiumAsync {
/**
* @brief Cache response retrieved from the cache database.
*/
class CESIUMASYNC_API CacheResponse {
public:
/**
* @brief Constructor.
* @param cacheStatusCode the status code of the response
* @param cacheHeaders the headers of the response
* @param cacheData the body of the response
*/
CacheResponse(
uint16_t cacheStatusCode,
HttpHeaders&& cacheHeaders,
std::vector<std::byte>&& cacheData)
: statusCode(cacheStatusCode),
headers(std::move(cacheHeaders)),
data(std::move(cacheData)) {}
/**
* @brief The status code of the response.
*/
uint16_t statusCode;
/**
* @brief The headers of the response.
*/
HttpHeaders headers;
/**
* @brief The body data of the response.
*/
std::vector<std::byte> data;
};
/**
* @brief Cache request retrieved from the cache database.
*/
class CESIUMASYNC_API CacheRequest {
public:
/**
* @brief Constructor.
* @param cacheHeaders the headers of the request
* @param cacheMethod the method of the request
* @param cacheUrl the url of the request
*/
CacheRequest(
HttpHeaders&& cacheHeaders,
std::string&& cacheMethod,
std::string&& cacheUrl)
: headers(std::move(cacheHeaders)),
method(std::move(cacheMethod)),
url(std::move(cacheUrl)) {}
/**
* @brief The headers of the request.
*/
HttpHeaders headers;
/**
* @brief The method of the request.
*/
std::string method;
/**
* @brief The url of the request.
*/
std::string url;
};
/**
* @brief Cache item retrieved from the cache database.
*/
class CESIUMASYNC_API CacheItem {
public:
/**
* @brief Constructor.
* @param cacheExpiryTime the time point this cache item will be expired
* @param request the cache request owned by this item
* @param response the cache response owned by this item
*/
CacheItem(
std::time_t cacheExpiryTime,
CacheRequest&& request,
CacheResponse&& response)
: expiryTime(cacheExpiryTime),
cacheRequest(std::move(request)),
cacheResponse(std::move(response)) {}
/**
* @brief The time point that this cache item is expired.
*/
std::time_t expiryTime;
/**
* @brief The cache request owned by this cache item.
*/
CacheRequest cacheRequest;
/**
* @brief The cache response owned by this cache item.
*/
CacheResponse cacheResponse;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,72 @@
#pragma once
#include "IAssetAccessor.h"
#include "IAssetRequest.h"
#include "ICacheDatabase.h"
#include "ThreadPool.h"
#include <spdlog/fwd.h>
#include <atomic>
#include <cstddef>
#include <memory>
#include <string>
namespace CesiumAsync {
class AsyncSystem;
/**
* @brief A decorator for an {@link IAssetAccessor} that caches requests and
* responses in an {@link ICacheDatabase}.
*
* This can be used to improve asset loading performance by caching assets
* across runs.
*/
class CachingAssetAccessor : public IAssetAccessor {
public:
/**
* @brief Constructs a new instance.
*
* @param pLogger The logger that receives messages about the status of this
* instance.
* @param pAssetAccessor The underlying {@link IAssetAccessor} used to
* retrieve assets that are not in the cache.
* @param pCacheDatabase The database in which to cache requests and
* responses.
* @param requestsPerCachePrune The number of requests to handle before each
* {@link ICacheDatabase::prune} of old cached results from the database.
*/
CachingAssetAccessor(
const std::shared_ptr<spdlog::logger>& pLogger,
const std::shared_ptr<IAssetAccessor>& pAssetAccessor,
const std::shared_ptr<ICacheDatabase>& pCacheDatabase,
int32_t requestsPerCachePrune = 10000);
virtual ~CachingAssetAccessor() noexcept override;
/** @copydoc IAssetAccessor::get */
virtual Future<std::shared_ptr<IAssetRequest>>
get(const AsyncSystem& asyncSystem,
const std::string& url,
const std::vector<THeader>& headers) override;
virtual Future<std::shared_ptr<IAssetRequest>> request(
const AsyncSystem& asyncSystem,
const std::string& verb,
const std::string& url,
const std::vector<THeader>& headers,
const std::span<const std::byte>& contentPayload) override;
/** @copydoc IAssetAccessor::tick */
virtual void tick() noexcept override;
private:
int32_t _requestsPerCachePrune;
std::atomic<int32_t> _requestSinceLastPrune;
std::shared_ptr<spdlog::logger> _pLogger;
std::shared_ptr<IAssetAccessor> _pAssetAccessor;
std::shared_ptr<ICacheDatabase> _pCacheDatabase;
ThreadPool _cacheThreadPool;
CESIUM_TRACE_DECLARE_TRACK_SET(_pruneSlots, "Prune cache database")
};
} // namespace CesiumAsync

View File

@ -0,0 +1,351 @@
#pragma once
#include "Impl/AsyncSystemSchedulers.h"
#include "Impl/CatchFunction.h"
#include "Impl/ContinuationFutureType.h"
#include "Impl/WithTracing.h"
#include "SharedFuture.h"
#include "ThreadPool.h"
#include <CesiumUtility/Tracing.h>
#include <variant>
namespace CesiumAsync {
namespace CesiumImpl {
template <typename R> struct ParameterizedTaskUnwrapper;
struct TaskUnwrapper;
} // namespace CesiumImpl
/**
* @brief A value that will be available in the future, as produced by
* {@link AsyncSystem}.
*
* @tparam T The type of the value.
*/
template <typename T> class Future final {
public:
/**
* @brief Move constructor
*/
Future(Future<T>&& rhs) noexcept
: _pSchedulers(std::move(rhs._pSchedulers)),
_task(std::move(rhs._task)) {}
/**
* @brief Move assignment operator.
*/
Future<T>& operator=(Future<T>&& rhs) noexcept {
this->_pSchedulers = std::move(rhs._pSchedulers);
this->_task = std::move(rhs._task);
return *this;
}
Future(const Future<T>& rhs) = delete;
Future<T>& operator=(const Future<T>& rhs) = delete;
/**
* @brief Registers a continuation function to be invoked in a worker thread
* when this Future resolves, and invalidates this Future.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* If this Future is resolved from a designated worker thread, the
* continuation function will be invoked immediately rather than in a
* separate task. Similarly, if the Future is already resolved when
* `thenInWorkerThread` is called from a designated worker thread, the
* continuation function will be invoked immediately before this
* method returns.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, T>
thenInWorkerThread(Func&& f) && {
return std::move(*this).thenWithScheduler(
this->_pSchedulers->workerThread.immediate,
"waiting for worker thread",
std::forward<Func>(f));
}
/**
* @brief Registers a continuation function to be invoked in the main thread
* when this Future resolves, and invalidates this Future.
*
* If this Future is resolved from the main thread, the
* continuation function will be invoked immediately rather than queued for
* later execution in the main thread. Similarly, if the Future is already
* resolved when `thenInMainThread` is called from the main thread, the
* continuation function will be invoked immediately before this
* method returns.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, T> thenInMainThread(Func&& f) && {
return std::move(*this).thenWithScheduler(
this->_pSchedulers->mainThread.immediate,
"waiting for main thread",
std::forward<Func>(f));
}
/**
* @brief Registers a continuation function to be invoked immediately in
* whichever thread causes the Future to be resolved, and invalidates this
* Future.
*
* If the Future is already resolved, the supplied function will be called
* immediately in the calling thread and this method will not return until
* that function does.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, T> thenImmediately(Func&& f) && {
return CesiumImpl::ContinuationFutureType_t<Func, T>(
this->_pSchedulers,
_task.then(
async::inline_scheduler(),
CesiumImpl::WithTracing<T>::end(nullptr, std::forward<Func>(f))));
}
/**
* @brief Registers a continuation function to be invoked in a thread pool
* when this Future resolves, and invalidates this Future.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* If this Future is resolved from a thread pool thread, the
* continuation function will be invoked immediately rather than in a
* separate task. Similarly, if the Future is already resolved when
* `thenInThreadPool` is called from a designated thread pool thread, the
* continuation function will be invoked immediately before this
* method returns.
*
* @tparam Func The type of the function.
* @param threadPool The thread pool where this function will be invoked.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, T>
thenInThreadPool(const ThreadPool& threadPool, Func&& f) && {
return std::move(*this).thenWithScheduler(
threadPool._pScheduler->immediate,
"waiting for thread pool thread",
std::forward<Func>(f));
}
/**
* @brief Registers a continuation function to be invoked in the main thread
* when this Future rejects, and invalidates this Future.
*
* If this Future is rejected from the main thread, the
* continuation function will be invoked immediately rather than queued for
* later execution in the main thread. Similarly, if the Future is already
* rejected when `catchInMainThread` is called from the main thread, the
* continuation function will be invoked immediately before this
* method returns.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* Any `then` continuations chained after this one will be invoked with the
* return value of the catch callback.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func> Future<T> catchInMainThread(Func&& f) && {
return std::move(*this).catchWithScheduler(
this->_pSchedulers->mainThread.immediate,
std::forward<Func>(f));
}
/**
* @brief Registers a continuation function to be invoked immediately, and
* invalidates this Future.
*
* When this Future is rejected, the continuation function will be invoked
* in whatever thread does the rejection. Similarly, if the Future is already
* rejected when `catchImmediately` is called, the continuation function will
* be invoked immediately before this method returns.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* Any `then` continuations chained after this one will be invoked with the
* return value of the catch callback.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func> Future<T> catchImmediately(Func&& f) && {
return std::move(*this).catchWithScheduler(
async::inline_scheduler(),
std::forward<Func>(f));
}
/**
* @brief Passes through one or more additional values to the next
* continuation.
*
* The next continuation will receive a tuple with each of the provided
* values, followed by the result of the current Future.
*
* @tparam TPassThrough The types to pass through to the next continuation.
* @param values The values to pass through to the next continuation.
* @return A new Future that resolves to a tuple with the pass-through values,
* followed by the result of the last Future.
*/
template <typename... TPassThrough>
Future<std::tuple<std::remove_cvref_t<TPassThrough>..., T>>
thenPassThrough(TPassThrough&&... values) && {
return std::move(*this).thenImmediately(
[values = std::tuple(std::forward<TPassThrough>(values)...)](
T&& result) mutable {
return std::tuple_cat(
std::move(values),
std::make_tuple(std::move(result)));
});
}
/**
* @brief Waits for the future to resolve or reject and returns the result.
*
* This method must not be called from the main thread, the one that calls
* {@link AsyncSystem::dispatchMainThreadTasks}. Doing so can lead to a
* deadlock because the main thread tasks will never complete while this
* method is blocking the main thread.
*
* To wait in the main thread, use {@link waitInMainThread} instead.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
T wait() { return this->_task.get(); }
/**
* @brief Waits for this future to resolve or reject in the main thread while
* also processing main-thread tasks.
*
* This method must be called from the main thread.
*
* The function does not return until {@link Future::isReady} returns true.
* In the meantime, main-thread tasks are processed as necessary. This method
* does not spin wait; it suspends the calling thread by waiting on a
* condition variable when there is no work to do.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
T waitInMainThread() {
return this->_pSchedulers->mainThread.dispatchUntilTaskCompletes(
std::move(this->_task));
}
/**
* @brief Determines if this future is already resolved or rejected.
*
* If this method returns true, it is guaranteed that {@link wait} will
* not block but will instead immediately return a value or throw an
* exception.
*
* @return True if the future is already resolved or rejected and {@link wait}
* will not block; otherwise, false.
*/
bool isReady() const { return this->_task.ready(); }
/**
* @brief Creates a version of this future that can be shared, meaning that
* its value may be accessed multiple times and multiple continuations may be
* attached to it.
*
* Calling this method invalidates the original Future.
*
* @return The `SharedFuture`.
*/
SharedFuture<T> share() && {
return SharedFuture<T>(this->_pSchedulers, this->_task.share());
}
private:
Future(
const std::shared_ptr<CesiumImpl::AsyncSystemSchedulers>& pSchedulers,
async::task<T>&& task) noexcept
: _pSchedulers(pSchedulers), _task(std::move(task)) {}
template <typename Func, typename Scheduler>
CesiumImpl::ContinuationFutureType_t<Func, T> thenWithScheduler(
Scheduler& scheduler,
const char* tracingName,
Func&& f) && {
// It would be nice if tracingName were a template parameter instead of a
// function parameter, but that triggers a bug in VS2017. It was previously
// a bug in VS2019, too, but has been fixed there:
// https://developercommunity.visualstudio.com/t/internal-compiler-error-when-compiling-a-template-1/534210
#if CESIUM_TRACING_ENABLED
// When tracing is enabled, we measure the time between scheduling and
// dispatching of the work.
auto task = this->_task.then(
async::inline_scheduler(),
CesiumImpl::WithTracing<T>::begin(tracingName, std::forward<Func>(f)));
#else
auto& task = this->_task;
#endif
return CesiumImpl::ContinuationFutureType_t<Func, T>(
this->_pSchedulers,
task.then(
scheduler,
CesiumImpl::WithTracing<T>::end(
tracingName,
std::forward<Func>(f))));
}
template <typename Func, typename Scheduler>
CesiumImpl::ContinuationFutureType_t<Func, std::exception>
catchWithScheduler(Scheduler& scheduler, Func&& f) && {
return CesiumImpl::ContinuationFutureType_t<Func, std::exception>(
this->_pSchedulers,
this->_task.then(
async::inline_scheduler(),
CesiumImpl::CatchFunction<Func, T, Scheduler>{
scheduler,
std::forward<Func>(f)}));
}
std::shared_ptr<CesiumImpl::AsyncSystemSchedulers> _pSchedulers;
async::task<T> _task;
friend class AsyncSystem;
template <typename R> friend struct CesiumImpl::ParameterizedTaskUnwrapper;
friend struct CesiumImpl::TaskUnwrapper;
template <typename R> friend class Future;
template <typename R> friend class SharedFuture;
template <typename R> friend class Promise;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,44 @@
#pragma once
#include "IAssetAccessor.h"
#include "IAssetRequest.h"
namespace CesiumAsync {
class AsyncSystem;
/**
* @brief A decorator for an {@link IAssetAccessor} that automatically unzips
* gzipped asset responses from the underlying Asset Accessor.
*/
class GunzipAssetAccessor : public IAssetAccessor {
public:
/**
* @brief Constructs a new instance.
*
* @param pAssetAccessor The underlying {@link IAssetAccessor} used to
* retrieve assets that may or may not be zipped.
*/
GunzipAssetAccessor(const std::shared_ptr<IAssetAccessor>& pAssetAccessor);
virtual ~GunzipAssetAccessor() noexcept override;
/** @copydoc IAssetAccessor::get */
virtual Future<std::shared_ptr<IAssetRequest>>
get(const AsyncSystem& asyncSystem,
const std::string& url,
const std::vector<THeader>& headers) override;
virtual Future<std::shared_ptr<IAssetRequest>> request(
const AsyncSystem& asyncSystem,
const std::string& verb,
const std::string& url,
const std::vector<THeader>& headers,
const std::span<const std::byte>& contentPayload) override;
/** @copydoc IAssetAccessor::tick */
virtual void tick() noexcept override;
private:
std::shared_ptr<IAssetAccessor> _pAssetAccessor;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,26 @@
#pragma once
#include <map>
#include <string>
namespace CesiumAsync {
/**
* @brief A case-insensitive `less-then` string comparison.
*
* This can be used as a `Compare` function, for example for a `std::map`.
* It will compare strings case-insensitively, by converting them to
* lower-case and comparing the results (leaving the exact behavior for
* non-ASCII strings unspecified).
*/
struct CaseInsensitiveCompare {
/** @brief Performs a case-insensitive comparison of the two strings using
* `std::lexicographical_compare`. */
bool operator()(const std::string& s1, const std::string& s2) const;
};
/**
* @brief Http Headers that maps case-insensitive header key with header value.
*/
using HttpHeaders = std::map<std::string, std::string, CaseInsensitiveCompare>;
} // namespace CesiumAsync

View File

@ -0,0 +1,73 @@
#pragma once
#include "Future.h"
#include "IAssetRequest.h"
#include "Library.h"
#include <cstddef>
#include <memory>
#include <span>
#include <string>
#include <vector>
namespace CesiumAsync {
class AsyncSystem;
/**
* @brief Provides asynchronous access to assets, usually files downloaded via
* HTTP.
*/
class CESIUMASYNC_API IAssetAccessor {
public:
/**
* @brief An HTTP header represented as a key/value pair.
*/
typedef std::pair<std::string, std::string> THeader;
virtual ~IAssetAccessor() = default;
/**
* @brief Starts a new request for the asset with the given URL.
* The request proceeds asynchronously without blocking the calling thread.
*
* @param asyncSystem The async system used to do work in threads.
* @param url The URL of the asset.
* @param headers The headers to include in the request.
* @return The in-progress asset request.
*/
virtual CesiumAsync::Future<std::shared_ptr<IAssetRequest>>
get(const AsyncSystem& asyncSystem,
const std::string& url,
const std::vector<THeader>& headers = {}) = 0;
/**
* @brief Starts a new request to the given URL, using the provided HTTP verb
* and the provided content payload.
*
* The request proceeds asynchronously without blocking the calling thread.
*
* @param asyncSystem The async system used to do work in threads.
* @param verb The HTTP verb to use, such as "POST" or "PATCH".
* @param url The URL of the asset.
* @param headers The headers to include in the request.
* @param contentPayload The payload data to include in the request.
* @return The in-progress asset request.
*/
virtual CesiumAsync::Future<std::shared_ptr<IAssetRequest>> request(
const AsyncSystem& asyncSystem,
const std::string& verb,
const std::string& url,
const std::vector<THeader>& headers = std::vector<THeader>(),
const std::span<const std::byte>& contentPayload = {}) = 0;
/**
* @brief Ticks the asset accessor system while the main thread is blocked.
*
* If the asset accessor is not dependent on the main thread to
* dispatch requests, this method does not need to do anything.
*/
virtual void tick() noexcept = 0;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,45 @@
#pragma once
#include "HttpHeaders.h"
#include "Library.h"
#include <functional>
#include <string>
namespace CesiumAsync {
class IAssetResponse;
/**
* @brief An asynchronous request for an asset, usually a file
* downloaded via HTTP.
*/
class CESIUMASYNC_API IAssetRequest {
public:
virtual ~IAssetRequest() = default;
/**
* @brief Gets the request's method. This method may be called from any
* thread.
*/
virtual const std::string& method() const = 0;
/**
* @brief Gets the requested URL. This method may be called from any thread.
*/
virtual const std::string& url() const = 0;
/**
* @brief Gets the request's header. This method may be called from any
* thread.
*/
virtual const HttpHeaders& headers() const = 0;
/**
* @brief Gets the response, or nullptr if the request is still in progress.
* This method may be called from any thread.
*/
virtual const IAssetResponse* response() const = 0;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,45 @@
#pragma once
#include "HttpHeaders.h"
#include "Library.h"
#include <cstddef>
#include <cstdint>
#include <map>
#include <span>
#include <string>
namespace CesiumAsync {
/**
* @brief A completed response for a 3D Tiles asset.
*/
class CESIUMASYNC_API IAssetResponse {
public:
/**
* @brief Default destructor
*/
virtual ~IAssetResponse() = default;
/**
* @brief Returns the HTTP response code.
*/
virtual uint16_t statusCode() const = 0;
/**
* @brief Returns the HTTP content type
*/
virtual std::string contentType() const = 0;
/**
* @brief Returns the HTTP headers of the response
*/
virtual const HttpHeaders& headers() const = 0;
/**
* @brief Returns the data of this response
*/
virtual std::span<const std::byte> data() const = 0;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,73 @@
#pragma once
#include "CacheItem.h"
#include "IAssetRequest.h"
#include "Library.h"
#include <cstddef>
#include <optional>
namespace CesiumAsync {
/**
* @brief Provides database storage interface to cache completed request.
*/
class CESIUMASYNC_API ICacheDatabase {
public:
virtual ~ICacheDatabase() noexcept = default;
/**
* @brief Gets a cache entry from the database.
*
* If an error prevents checking the database for the key, this function,
* depending on the implementation, may log the error. However, it should
* return `std::nullopt`. It should not throw an exception.
*
* @param key The unique key associated with the cache entry.
* @return The result of the cache lookup, or `std::nullopt` if the key does
* not exist in the cache or an error occurred.
*/
virtual std::optional<CacheItem> getEntry(const std::string& key) const = 0;
/**
* @brief Store a cache entry in the database.
*
* @param key the unique key associated with the response
* @param expiryTime the time point that this response should be expired. An
* expired response will be removed when prunning the database.
* @param url The URL being cached.
* @param requestMethod The HTTP method being cached.
* @param requestHeaders The HTTP request headers being cached.
* @param statusCode The HTTP response status code being cached.
* @param responseHeaders The HTTP response headers being cached.
* @param responseData The HTTP response being cached.
* @return `true` if the entry was successfully stored, or `false` if it could
* not be stored due to an error.
*/
virtual bool storeEntry(
const std::string& key,
std::time_t expiryTime,
const std::string& url,
const std::string& requestMethod,
const HttpHeaders& requestHeaders,
uint16_t statusCode,
const HttpHeaders& responseHeaders,
const std::span<const std::byte>& responseData) = 0;
/**
* @brief Remove cache entries from the database to satisfy the database
* invariant condition (.e.g exired response or LRU).
*
* @return `true` if the database was successfully pruned, or `false` if it
* could not be pruned due to an errror.
*/
virtual bool prune() = 0;
/**
* @brief Removes all cache entries from the database.
*
* @return `true` if the database was successfully cleared, or `false` if it
* could not be pruned due to an errror.
*/
virtual bool clearAll() = 0;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,29 @@
#pragma once
#include "Library.h"
#include <functional>
namespace CesiumAsync {
/**
* @brief When implemented by a rendering engine, allows tasks to be
* asynchronously executed in background threads.
*
* Not supposed to be used by clients.
*/
class ITaskProcessor {
public:
/**
* @brief Default destructor
*/
virtual ~ITaskProcessor() = default;
/**
* @brief Starts a task that executes the given function in a background
* thread.
*
* @param f The function to execute
*/
virtual void startTask(std::function<void()> f) = 0;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,27 @@
#pragma once
#include "QueuedScheduler.h"
#include "TaskScheduler.h"
#include "cesium-async++.h"
namespace CesiumAsync {
class ITaskProcessor;
namespace CesiumImpl {
// Begin omitting doxygen warnings for Impl namespace
//! @cond Doxygen_Suppress
class AsyncSystemSchedulers {
public:
AsyncSystemSchedulers(const std::shared_ptr<ITaskProcessor>& pTaskProcessor)
: mainThread(), workerThread(pTaskProcessor) {}
QueuedScheduler mainThread;
TaskScheduler workerThread;
};
//! @endcond
// End omitting doxygen warnings for Impl namespace
} // namespace CesiumImpl
} // namespace CesiumAsync

View File

@ -0,0 +1,76 @@
#pragma once
#include "unwrapFuture.h"
namespace CesiumAsync {
namespace CesiumImpl {
// Begin omitting doxgen warnings for Impl namespace
//! @cond Doxygen_Suppress
template <
typename Func,
typename T,
typename Scheduler,
typename TaskParameter = async::task<T>&&>
struct CatchFunction {
Scheduler& scheduler;
Func f;
async::task<T> operator()(TaskParameter t) {
try {
return async::make_task(t.get());
} catch (...) {
// Make an exception_ptr task, then scheduler to a wrapper around f that
// throws it, catches it, and calls f with a reference to it.
auto ptrToException = [f = std::move(f)](std::exception_ptr&& e) mutable {
try {
std::rethrow_exception(e);
} catch (std::exception& e) {
return f(std::move(e));
} catch (...) {
return f(std::runtime_error("Unknown exception"));
}
};
return async::make_task(std::current_exception())
.then(
scheduler,
unwrapFuture<decltype(ptrToException), std::exception_ptr>(
std::move(ptrToException)));
}
}
};
template <typename Func, typename Scheduler, typename TaskParameter>
struct CatchFunction<Func, void, Scheduler, TaskParameter> {
Scheduler& scheduler;
Func f;
async::task<void> operator()(TaskParameter t) {
try {
t.get();
return async::make_task();
} catch (...) {
// Make an exception_ptr task, then scheduler to a wrapper around f that
// throws it, catches it, and calls f with a reference to it.
auto ptrToException = [f = std::move(f)](std::exception_ptr&& e) mutable {
try {
std::rethrow_exception(e);
} catch (std::exception& e) {
return f(std::move(e));
} catch (...) {
return f(std::runtime_error("Unknown exception"));
}
};
return async::make_task(std::current_exception())
.then(
scheduler,
unwrapFuture<decltype(ptrToException), std::exception_ptr>(
std::move(ptrToException)));
}
}
};
//! @endcond
// End omitting doxgen warnings for Impl namespace
} // namespace CesiumImpl
} // namespace CesiumAsync

View File

@ -0,0 +1,25 @@
#pragma once
#include "ContinuationReturnType.h"
#include "RemoveFuture.h"
namespace CesiumAsync {
template <typename T> class Future;
namespace CesiumImpl {
// Begin omitting doxgen warnings for Impl namespace
//! @cond Doxygen_Suppress
template <typename Func, typename T> struct ContinuationFutureType {
using type = Future<typename RemoveFuture<
typename ContinuationReturnType<Func, T>::type>::type>;
};
template <typename Func, typename T>
using ContinuationFutureType_t = typename ContinuationFutureType<Func, T>::type;
//! @endcond
// End omitting doxgen warnings for Impl namespace
} // namespace CesiumImpl
} // namespace CesiumAsync

View File

@ -0,0 +1,21 @@
#pragma once
#include <type_traits>
namespace CesiumAsync {
namespace CesiumImpl {
// Begin omitting doxgen warnings for Impl namespace
//! @cond Doxygen_Suppress
template <typename Func, typename T> struct ContinuationReturnType {
using type = typename std::invoke_result<Func, T>::type;
};
template <typename Func> struct ContinuationReturnType<Func, void> {
using type = typename std::invoke_result<Func>::type;
};
//! @endcond
// End omitting doxgen warnings for Impl namespace
} // namespace CesiumImpl
} // namespace CesiumAsync

View File

@ -0,0 +1,97 @@
#pragma once
#include "cesium-async++.h"
#include <CesiumUtility/Assert.h>
#include <spdlog/spdlog.h>
namespace CesiumAsync {
// Begin omitting doxygen warnings for Impl namespace
//! @cond Doxygen_Suppress
namespace CesiumImpl {
template <typename TScheduler> class ImmediateScheduler {
public:
explicit ImmediateScheduler(TScheduler* pScheduler) noexcept
: _pScheduler(pScheduler) {}
void schedule(async::task_run_handle t) {
// Are we already in a suitable thread?
std::vector<TScheduler*>& inSuitable =
ImmediateScheduler<TScheduler>::getSchedulersCurrentlyDispatching();
if (std::find(inSuitable.begin(), inSuitable.end(), this->_pScheduler) !=
inSuitable.end()) {
// Yes, run this task directly.
t.run();
} else {
// No, schedule this task with the deferred scheduler.
this->_pScheduler->schedule(std::move(t));
}
}
class SchedulerScope {
public:
SchedulerScope(TScheduler* pScheduler = nullptr) : _pScheduler(pScheduler) {
if (this->_pScheduler) {
std::vector<TScheduler*>& inSuitable =
ImmediateScheduler<TScheduler>::getSchedulersCurrentlyDispatching();
inSuitable.push_back(this->_pScheduler);
}
}
~SchedulerScope() noexcept { this->reset(); }
SchedulerScope(SchedulerScope&& rhs) noexcept
: _pScheduler(rhs._pScheduler) {
rhs._pScheduler = nullptr;
}
SchedulerScope& operator=(SchedulerScope&& rhs) noexcept {
std::swap(this->_pScheduler, rhs._pScheduler);
return *this;
}
void reset() noexcept {
if (this->_pScheduler) {
std::vector<TScheduler*>& inSuitable =
ImmediateScheduler<TScheduler>::getSchedulersCurrentlyDispatching();
CESIUM_ASSERT(!inSuitable.empty());
CESIUM_ASSERT(inSuitable.back() == this->_pScheduler);
inSuitable.pop_back();
this->_pScheduler = nullptr;
}
}
SchedulerScope(const SchedulerScope&) = delete;
SchedulerScope& operator=(const SchedulerScope&) = delete;
private:
TScheduler* _pScheduler;
};
SchedulerScope scope() { return SchedulerScope(this->_pScheduler); }
private:
TScheduler* _pScheduler;
// If a TScheduler instance is found in this thread-local vector, then the
// current thread has been dispatched by this scheduler and therefore we can
// dispatch immediately.
static std::vector<TScheduler*>&
getSchedulersCurrentlyDispatching() noexcept {
// We're using a static local here rather than a static field because, on
// at least some Linux systems (mine), with Clang 12, in a Debug build, a
// thread_local static field causes a SEGFAULT on access.
// I don't understand why (despite hours trying), but making it a static
// local instead solves the problem and is arguably cleaner, anyway.
static thread_local std::vector<TScheduler*> schedulersCurrentlyDispatching;
return schedulersCurrentlyDispatching;
}
};
//! @endcond
// End omitting doxygen warnings for Impl namespace
} // namespace CesiumImpl
} // namespace CesiumAsync

View File

@ -0,0 +1,95 @@
#pragma once
#include "ImmediateScheduler.h"
#include "cesium-async++.h"
#include <atomic>
namespace CesiumAsync {
// Begin omitting doxygen warnings for Impl namespace
//! @cond Doxygen_Suppress
namespace CesiumImpl {
class QueuedScheduler {
public:
QueuedScheduler();
~QueuedScheduler();
void schedule(async::task_run_handle t);
void dispatchQueuedContinuations();
bool dispatchZeroOrOneContinuation();
template <typename T> T dispatchUntilTaskCompletes(async::task<T>&& task) {
// Set up a continuation to unblock the blocking dispatch when this task
// completes.
//
// We use the `isDone` flag as the loop termination condition to
// avoid a race condition that can lead to a deadlock. If we used
// `unblockTask.ready()` as the termination condition instead, then it's
// possible for events to happen as follows:
//
// 1. The original `task` completes in a worker thread and the `unblockTask`
// continuation is invoked immediately in the same thread.
// 2. The unblockTask continuation calls `unblock`, which terminates the
// `wait` on the condition variable in the main thread.
// 3. The main thread resumes and the while loop in this function spins back
// around and evaluates `unblockTask.ready()`. This returns false because
// the unblockTask continuation has not actually finished running in the
// worker thread yet. The main thread starts waiting on the condition
// variable again.
// 4. The `unblockTask` continuation finally finishes, making
// `unblockTask.ready()` return true, but it's too late. The main thread is
// already waiting on the condition variable.
//
// By setting the atomic `isDone` flag before calling `unblock`, we ensure
// that the loop termination condition is satisfied before the main thread
// is awoken, avoiding the potential deadlock.
std::atomic<bool> isDone = false;
async::task<T> unblockTask = task.then(
async::inline_scheduler(),
[this, &isDone](async::task<T>&& task) {
isDone = true;
this->unblock();
return task.get();
});
while (!isDone) {
this->dispatchInternal(true);
}
return std::move(unblockTask).get();
}
template <typename T>
T dispatchUntilTaskCompletes(const async::shared_task<T>& task) {
// Set up a continuation to unblock the blocking dispatch when this task
// completes. This case is simpler than the one above because a SharedFuture
// supports multiple continuations. We can use readiness of the _original_
// task to terminate the loop while unblocking in a separate continuation
// guaranteed to run only after that termination condition is satisfied.
async::task<void> unblockTask = task.then(
async::inline_scheduler(),
[this](const async::shared_task<T>&) { this->unblock(); });
while (!task.ready()) {
this->dispatchInternal(true);
}
return task.get();
}
ImmediateScheduler<QueuedScheduler> immediate{this};
private:
bool dispatchInternal(bool blockIfNoTasks);
void unblock();
struct Impl;
std::unique_ptr<Impl> _pImpl;
};
//! @endcond
// End omitting doxygen warnings for Impl namespace
} // namespace CesiumImpl
} // namespace CesiumAsync

View File

@ -0,0 +1,45 @@
#pragma once
#include "cesium-async++.h"
namespace CesiumAsync {
template <class T> class Future;
template <class T> class SharedFuture;
namespace CesiumImpl {
// Begin omitting doxgen warnings for Impl namespace
//! @cond Doxygen_Suppress
template <typename T> struct RemoveFuture {
typedef T type;
};
template <typename T> struct RemoveFuture<Future<T>> {
typedef T type;
};
template <typename T> struct RemoveFuture<const Future<T>> {
typedef T type;
};
template <typename T> struct RemoveFuture<SharedFuture<T>> {
typedef T type;
};
template <typename T> struct RemoveFuture<const SharedFuture<T>> {
typedef T type;
};
template <typename T> struct RemoveFuture<async::task<T>> {
typedef T type;
};
template <typename T> struct RemoveFuture<const async::task<T>> {
typedef T type;
};
template <typename T> struct RemoveFuture<async::shared_task<T>> {
typedef T type;
};
template <typename T> struct RemoveFuture<const async::shared_task<T>> {
typedef T type;
};
//! @endcond
// End omitting doxgen warnings for Impl namespace
} // namespace CesiumImpl
} // namespace CesiumAsync

View File

@ -0,0 +1,27 @@
#pragma once
#include "../ITaskProcessor.h"
#include "ImmediateScheduler.h"
#include <memory>
namespace CesiumAsync {
namespace CesiumImpl {
// Begin omitting doxygen warnings for Impl namespace
//! @cond Doxygen_Suppress
class TaskScheduler {
public:
TaskScheduler(const std::shared_ptr<ITaskProcessor>& pTaskProcessor);
void schedule(async::task_run_handle t);
ImmediateScheduler<TaskScheduler> immediate{this};
private:
std::shared_ptr<ITaskProcessor> _pTaskProcessor;
};
//! @endcond
// End omitting doxygen warnings for Impl namespace
} // namespace CesiumImpl
} // namespace CesiumAsync

View File

@ -0,0 +1,124 @@
#pragma once
#include "unwrapFuture.h"
#include <CesiumUtility/Tracing.h>
namespace CesiumAsync {
namespace CesiumImpl {
// Begin omitting doxgen warnings for Impl namespace
//! @cond Doxygen_Suppress
template <typename T> struct WithTracing {
template <typename Func>
static auto
begin([[maybe_unused]] const char* tracingName, [[maybe_unused]] Func&& f) {
#if CESIUM_TRACING_ENABLED
return
[tracingName, CESIUM_TRACE_LAMBDA_CAPTURE_TRACK()](T&& result) mutable {
CESIUM_TRACE_USE_CAPTURED_TRACK();
if (tracingName) {
CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
}
return std::move(result);
};
#else
return CesiumImpl::unwrapFuture<Func, T>(std::forward<Func>(f));
#endif
}
template <typename Func>
static auto end([[maybe_unused]] const char* tracingName, Func&& f) {
#if CESIUM_TRACING_ENABLED
return [tracingName,
f = CesiumImpl::unwrapFuture<Func, T>(std::forward<Func>(f)),
CESIUM_TRACE_LAMBDA_CAPTURE_TRACK()](T&& result) mutable {
CESIUM_TRACE_USE_CAPTURED_TRACK();
if (tracingName) {
CESIUM_TRACE_END_IN_TRACK(tracingName);
}
return f(std::move(result));
};
#else
return CesiumImpl::unwrapFuture<Func, T>(std::forward<Func>(f));
#endif
}
};
template <typename T> struct WithTracingShared {
template <typename Func>
static auto
begin([[maybe_unused]] const char* tracingName, [[maybe_unused]] Func&& f) {
#if CESIUM_TRACING_ENABLED
return [tracingName,
CESIUM_TRACE_LAMBDA_CAPTURE_TRACK()](const T& result) mutable {
CESIUM_TRACE_USE_CAPTURED_TRACK();
if (tracingName) {
CESIUM_TRACE_BEGIN_IN_TRACK(tracingName);
}
return result;
};
#else
return CesiumImpl::unwrapSharedFuture<Func, T>(std::forward<Func>(f));
#endif
}
template <typename Func>
static auto end([[maybe_unused]] const char* tracingName, Func&& f) {
#if CESIUM_TRACING_ENABLED
return [tracingName,
f = CesiumImpl::unwrapSharedFuture<Func, T>(std::forward<Func>(f)),
CESIUM_TRACE_LAMBDA_CAPTURE_TRACK()](const T& result) mutable {
CESIUM_TRACE_USE_CAPTURED_TRACK();
if (tracingName) {
CESIUM_TRACE_END_IN_TRACK(tracingName);
}
return f(result);
};
#else
return CesiumImpl::unwrapSharedFuture<Func, T>(std::forward<Func>(f));
#endif
}
};
template <> struct WithTracing<void> {
template <typename Func>
static auto
begin([[maybe_unused]] const char* tracingName, [[maybe_unused]] Func&& f) {
#if CESIUM_TRACING_ENABLED
return [tracingName, CESIUM_TRACE_LAMBDA_CAPTURE_TRACK()]() mutable {
CESIUM_TRACE_USE_CAPTURED_TRACK();
if (tracingName) {
CESIUM_TRACE_END_IN_TRACK(tracingName);
}
};
#else
return CesiumImpl::unwrapFuture<Func>(std::forward<Func>(f));
#endif
}
template <typename Func>
static auto end([[maybe_unused]] const char* tracingName, Func&& f) {
#if CESIUM_TRACING_ENABLED
return [tracingName,
f = CesiumImpl::unwrapFuture<Func>(std::forward<Func>(f)),
CESIUM_TRACE_LAMBDA_CAPTURE_TRACK()]() mutable {
CESIUM_TRACE_USE_CAPTURED_TRACK();
if (tracingName) {
CESIUM_TRACE_END_IN_TRACK(tracingName);
}
return f();
};
#else
return CesiumImpl::unwrapFuture<Func>(std::forward<Func>(f));
#endif
}
};
// With a void Future, shared and non-shared are identical.
template <> struct WithTracingShared<void> : public WithTracing<void> {};
//! @endcond
// End omitting doxgen warnings for Impl namespace
} // namespace CesiumImpl
} // namespace CesiumAsync

View File

@ -0,0 +1,16 @@
#pragma once
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4458 4324 4702)
#endif
#ifndef _MSC_VER
#pragma GCC diagnostic ignored "-Wshadow"
#endif
#include <async++.h>
#ifdef _MSC_VER
#pragma warning(pop)
#endif

View File

@ -0,0 +1,77 @@
#pragma once
#include "ContinuationFutureType.h"
#include "ContinuationReturnType.h"
namespace CesiumAsync {
namespace CesiumImpl {
// Begin omitting doxgen warnings for Impl namespace
//! @cond Doxygen_Suppress
struct IdentityUnwrapper {
template <typename Func> static Func unwrap(Func&& f) {
return std::forward<Func>(f);
}
template <typename Func> static Func unwrapShared(Func&& f) {
return std::forward<Func>(f);
}
};
template <typename T> struct ParameterizedTaskUnwrapper {
template <typename Func> static auto unwrap(Func&& f) {
return [f = std::forward<Func>(f)](T&& t) mutable {
return f(std::move(t))._task;
};
}
template <typename Func> static auto unwrapShared(Func&& f) {
return
[f = std::forward<Func>(f)](const T& t) mutable { return f(t)._task; };
}
};
struct TaskUnwrapper {
template <typename Func> static auto unwrap(Func&& f) {
return [f = std::forward<Func>(f)]() mutable { return f()._task; };
}
};
template <typename Func, typename T> auto unwrapFuture(Func&& f) {
return std::conditional<
std::is_same<
typename ContinuationReturnType<Func, T>::type,
typename RemoveFuture<
typename ContinuationFutureType<Func, T>::type>::type>::value,
IdentityUnwrapper,
ParameterizedTaskUnwrapper<T>>::type::unwrap(std::forward<Func>(f));
}
template <typename Func, typename T> auto unwrapSharedFuture(Func&& f) {
return std::conditional<
std::is_same<
typename ContinuationReturnType<Func, T>::type,
typename RemoveFuture<
typename ContinuationFutureType<Func, T>::type>::type>::value,
IdentityUnwrapper,
ParameterizedTaskUnwrapper<T>>::type::unwrapShared(std::forward<Func>(f));
}
template <typename Func> auto unwrapFuture(Func&& f) {
return std::conditional<
std::is_same<
typename ContinuationReturnType<Func, void>::type,
typename RemoveFuture<
typename ContinuationFutureType<Func, void>::type>::type>::value,
IdentityUnwrapper,
TaskUnwrapper>::type::unwrap(std::forward<Func>(f));
}
template <typename Func> auto unwrapSharedFuture(Func&& f) {
return unwrapFuture(std::forward<Func>(f));
}
//! @endcond
// End omitting doxgen warnings for Impl namespace
} // namespace CesiumImpl
} // namespace CesiumAsync

View File

@ -0,0 +1,18 @@
#pragma once
/**
* @brief Classes that support asynchronous operations.
*
* @mermaid-interactive{dependencies/CesiumAsync}
*/
namespace CesiumAsync {}
#if defined(_WIN32) && defined(CESIUM_SHARED)
#ifdef CESIUMASYNC_BUILDING
#define CESIUMASYNC_API __declspec(dllexport)
#else
#define CESIUMASYNC_API __declspec(dllimport)
#endif
#else
#define CESIUMASYNC_API
#endif

View File

@ -0,0 +1,70 @@
#pragma once
#include <CesiumAsync/IAssetAccessor.h>
#include <CesiumUtility/Result.h>
#include <cstddef>
#include <memory>
#include <string>
#include <vector>
namespace CesiumAsync {
class AsyncSystem;
/**
* @brief A description of an asset that can be loaded from the network using an
* {@link IAssetAccessor}. This includes a URL and any headers to be included
* in the request.
*/
struct NetworkAssetDescriptor {
/**
* @brief The URL from which this network asset is downloaded.
*/
std::string url;
/**
* @brief The HTTP headers used in requesting this asset.
*/
std::vector<IAssetAccessor::THeader> headers;
/**
* @brief Determines if this descriptor is identical to another one.
*/
bool operator==(const NetworkAssetDescriptor& rhs) const noexcept;
/**
* @brief Request this asset from the network using the provided asset
* accessor.
*
* @param asyncSystem The async system.
* @param pAssetAccessor The asset accessor.
* @return A future that resolves to the request once it is complete.
*/
Future<std::shared_ptr<CesiumAsync::IAssetRequest>> loadFromNetwork(
const CesiumAsync::AsyncSystem& asyncSystem,
const std::shared_ptr<IAssetAccessor>& pAssetAccessor) const;
/**
* @brief Request this asset from the network using the provided asset
* accessor and return the downloaded bytes.
*
* @param asyncSystem The async system.
* @param pAssetAccessor The asset accessor.
* @return A future that resolves to the downloaded bytes once the request is
* complete.
*/
Future<CesiumUtility::Result<std::vector<std::byte>>> loadBytesFromNetwork(
const CesiumAsync::AsyncSystem& asyncSystem,
const std::shared_ptr<IAssetAccessor>& pAssetAccessor) const;
};
} // namespace CesiumAsync
/** @brief Hash implementation for \ref CesiumAsync::NetworkAssetDescriptor. */
template <> struct std::hash<CesiumAsync::NetworkAssetDescriptor> {
/** @brief Returns a `size_t` hash of the provided \ref
* CesiumAsync::NetworkAssetDescriptor. */
std::size_t
operator()(const CesiumAsync::NetworkAssetDescriptor& key) const noexcept;
};

View File

@ -0,0 +1,124 @@
#pragma once
#include "Impl/AsyncSystemSchedulers.h"
#include "Impl/cesium-async++.h"
#include <exception>
#include <memory>
namespace CesiumAsync {
template <typename T> class Future;
/**
* @brief A promise that can be resolved or rejected by an asynchronous task.
*
* @tparam T The type of the object that the promise will be resolved with.
*/
template <typename T> class Promise {
public:
/**
* @brief Will be called when the task completed successfully.
*
* @param value The value that was computed by the asynchronous task.
*/
void resolve(T&& value) const { this->_pEvent->set(std::move(value)); }
/**
* @brief Will be called when the task completed successfully.
*
* @param value The value that was computed by the asynchronous task.
*/
void resolve(const T& value) const { this->_pEvent->set(value); }
/**
* @brief Will be called when the task failed.
*
* @param error The error that caused the task to fail.
*/
template <typename TException> void reject(TException error) const {
this->_pEvent->set_exception(std::make_exception_ptr(error));
}
/**
* @brief Will be called when the task failed.
*
* @param error The error, captured with `std::current_exception`, that
* caused the task to fail.
*/
void reject(const std::exception_ptr& error) const {
this->_pEvent->set_exception(error);
}
/**
* @brief Gets the Future that resolves or rejects when this Promise is
* resolved or rejected.
*
* This method may only be called once.
*
* @return The future.
*/
Future<T> getFuture() const {
return Future<T>(this->_pSchedulers, this->_pEvent->get_task());
}
private:
Promise(
const std::shared_ptr<CesiumImpl::AsyncSystemSchedulers>& pSchedulers,
const std::shared_ptr<async::event_task<T>>& pEvent) noexcept
: _pSchedulers(pSchedulers), _pEvent(pEvent) {}
std::shared_ptr<CesiumImpl::AsyncSystemSchedulers> _pSchedulers;
std::shared_ptr<async::event_task<T>> _pEvent;
friend class AsyncSystem;
};
/**
* @brief Specialization for promises that resolve to no value.
*/
template <> class Promise<void> {
public:
/**
* @brief Will be called when the task completed successfully.
*/
void resolve() const { this->_pEvent->set(); }
/**
* @brief Will be called when the task failed.
*
* @param error The error that caused the task to fail.
*/
template <typename TException> void reject(TException error) const {
this->_pEvent->set_exception(std::make_exception_ptr(error));
}
/**
* @brief Will be called when the task failed.
*
* @param error The error, captured with `std::current_exception`, that
* caused the task to fail.
*/
void reject(const std::exception_ptr& error) const {
this->_pEvent->set_exception(error);
}
/**
* @copydoc Promise::getFuture
*/
Future<void> getFuture() const {
return Future<void>(this->_pSchedulers, this->_pEvent->get_task());
}
private:
Promise(
const std::shared_ptr<CesiumImpl::AsyncSystemSchedulers>& pSchedulers,
const std::shared_ptr<async::event_task<void>>& pEvent) noexcept
: _pSchedulers(pSchedulers), _pEvent(pEvent) {}
std::shared_ptr<CesiumImpl::AsyncSystemSchedulers> _pSchedulers;
std::shared_ptr<async::event_task<void>> _pEvent;
friend class AsyncSystem;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,562 @@
#pragma once
#include <CesiumAsync/AsyncSystem.h>
#include <CesiumAsync/Future.h>
#include <CesiumAsync/IAssetAccessor.h>
#include <CesiumUtility/DoublyLinkedList.h>
#include <CesiumUtility/IDepotOwningAsset.h>
#include <CesiumUtility/IntrusivePointer.h>
#include <CesiumUtility/ReferenceCounted.h>
#include <CesiumUtility/Result.h>
#include <cstddef>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
namespace CesiumUtility {
template <typename T> class SharedAsset;
}
namespace CesiumAsync {
/**
* @brief A depot for {@link CesiumUtility::SharedAsset} instances, which are potentially shared between multiple objects.
*
* @tparam TAssetType The type of asset stored in this depot. This should
* be derived from {@link CesiumUtility::SharedAsset}.
*/
template <typename TAssetType, typename TAssetKey>
class CESIUMASYNC_API SharedAssetDepot
: public CesiumUtility::ReferenceCountedThreadSafe<
SharedAssetDepot<TAssetType, TAssetKey>>,
public CesiumUtility::IDepotOwningAsset<TAssetType> {
public:
/**
* @brief The maximum total byte usage of assets that have been loaded but are
* no longer needed.
*
* When cached assets are no longer needed, they're marked as
* candidates for deletion. However, this deletion doesn't actually occur
* until the total byte usage of deletion candidates exceeds this threshold.
* At that point, assets are cleaned up in the order that they were marked for
* deletion until the total dips below this threshold again.
*
* Default is 16MiB.
*/
int64_t inactiveAssetSizeLimitBytes = 16 * 1024 * 1024;
/**
* @brief Signature for the callback function that will be called to fetch and
* create a new instance of `TAssetType` if one with the given key doesn't
* already exist in the depot.
*
* @param asyncSystem The \ref AsyncSystem used by this \ref SharedAssetDepot.
* @param pAssetAccessor The \ref IAssetAccessor used by this \ref
* SharedAssetDepot. Use this to fetch the asset.
* @param key The `TAssetKey` for the asset that should be loaded by this
* factory.
* @returns A \ref CesiumAsync::Future "Future" that resolves to a \ref
* CesiumUtility::ResultPointer "ResultPointer" containing the loaded asset,
* or any error information if the asset failed to load.
*/
using FactorySignature =
CesiumAsync::Future<CesiumUtility::ResultPointer<TAssetType>>(
const AsyncSystem& asyncSystem,
const std::shared_ptr<IAssetAccessor>& pAssetAccessor,
const TAssetKey& key);
/**
* @brief Creates a new `SharedAssetDepot` using the given factory callback to
* load new assets.
*
* @param factory The factory to use to fetch and create assets that don't
* already exist in the depot. See \ref FactorySignature.
*/
SharedAssetDepot(std::function<FactorySignature> factory);
virtual ~SharedAssetDepot();
/**
* @brief Gets an asset from the depot if it already exists, or creates it
* using the depot's factory if it does not.
*
* @param asyncSystem The async system.
* @param pAssetAccessor The asset accessor to use to download assets, if
* necessary.
* @param assetKey The key uniquely identifying the asset to get or create.
* @return A shared future that resolves when the asset is ready or fails.
*/
SharedFuture<CesiumUtility::ResultPointer<TAssetType>> getOrCreate(
const AsyncSystem& asyncSystem,
const std::shared_ptr<IAssetAccessor>& pAssetAccessor,
const TAssetKey& assetKey);
/**
* @brief Returns the total number of distinct assets contained in this depot,
* including both active and inactive assets.
*/
size_t getAssetCount() const;
/**
* @brief Gets the number of assets owned by this depot that are active,
* meaning that they are currently being used in one or more places.
*/
size_t getActiveAssetCount() const;
/**
* @brief Gets the number of assets owned by this depot that are inactive,
* meaning that they are not currently being used.
*/
size_t getInactiveAssetCount() const;
/**
* @brief Gets the total bytes used by inactive (unused) assets owned by this
* depot.
*/
int64_t getInactiveAssetTotalSizeBytes() const;
private:
struct LockHolder;
// Disable copy
void operator=(const SharedAssetDepot<TAssetType, TAssetKey>& other) = delete;
/**
* @brief Locks the shared asset depot for thread-safe access. It will remain
* locked until the returned object is destroyed or the `unlock` method is
* called on it.
*/
LockHolder lock() const;
/**
* @brief Marks the given asset as a candidate for deletion.
* Should only be called by {@link SharedAsset}. May be called from any thread.
*
* @param asset The asset to mark for deletion.
* @param threadOwnsDepotLock True if the calling thread already owns the
* depot lock; otherwise, false.
*/
void markDeletionCandidate(const TAssetType& asset, bool threadOwnsDepotLock)
override;
void markDeletionCandidateUnderLock(const TAssetType& asset);
/**
* @brief Unmarks the given asset as a candidate for deletion.
* Should only be called by {@link SharedAsset}. May be called from any thread.
*
* @param asset The asset to unmark for deletion.
* @param threadOwnsDepotLock True if the calling thread already owns the
* depot lock; otherwise, false.
*/
void unmarkDeletionCandidate(
const TAssetType& asset,
bool threadOwnsDepotLock) override;
void unmarkDeletionCandidateUnderLock(const TAssetType& asset);
/**
* @brief An entry for an asset owned by this depot. This is reference counted
* so that we can keep it alive during async operations.
*/
struct AssetEntry
: public CesiumUtility::ReferenceCountedThreadSafe<AssetEntry> {
AssetEntry(TAssetKey&& key_)
: CesiumUtility::ReferenceCountedThreadSafe<AssetEntry>(),
key(std::move(key_)),
pAsset(),
maybePendingAsset(),
errorsAndWarnings(),
sizeInDeletionList(0),
deletionListPointers() {}
AssetEntry(const TAssetKey& key_) : AssetEntry(TAssetKey(key_)) {}
/**
* @brief The unique key identifying this asset.
*/
TAssetKey key;
/**
* @brief A pointer to the asset. This may be nullptr if the asset is still
* being loaded, or if it failed to load.
*/
std::unique_ptr<TAssetType> pAsset;
/**
* @brief If this asset is currently loading, this field holds a shared
* future that will resolve when the asset load is complete. This field will
* be empty if the asset finished loading, including if it failed to load.
*/
std::optional<SharedFuture<CesiumUtility::ResultPointer<TAssetType>>>
maybePendingAsset;
/**
* @brief The errors and warnings that occurred while loading this asset.
* This will not contain any errors or warnings if the asset has not
* finished loading yet.
*/
CesiumUtility::ErrorList errorsAndWarnings;
/**
* @brief The size of this asset when it was added to the
* _deletionCandidates list. This is stored so that the exact same size can
* be subtracted later. The value of this field is undefined if the asset is
* not currently in the _deletionCandidates list.
*/
int64_t sizeInDeletionList;
/**
* @brief The next and previous pointers to entries in the
* _deletionCandidates list.
*/
CesiumUtility::DoublyLinkedListPointers<AssetEntry> deletionListPointers;
CesiumUtility::ResultPointer<TAssetType> toResultUnderLock() const;
};
// Manages the depot's mutex. Also ensures, via IntrusivePointer, that the
// depot won't be destroyed while the lock is held.
struct LockHolder {
LockHolder(
const CesiumUtility::IntrusivePointer<const SharedAssetDepot>& pDepot);
~LockHolder();
void unlock();
private:
// These two fields _must_ be declared in this order to guarantee that the
// mutex is released before the depot pointer. Releasing the depot pointer
// could destroy the depot, and that will be disastrous if the lock is still
// held.
CesiumUtility::IntrusivePointer<const SharedAssetDepot> pDepot;
std::unique_lock<std::mutex> lock;
};
// Maps asset keys to AssetEntry instances. This collection owns the asset
// entries.
std::unordered_map<TAssetKey, CesiumUtility::IntrusivePointer<AssetEntry>>
_assets;
// Maps asset pointers to AssetEntry instances. The values in this map refer
// to instances owned by the _assets map.
std::unordered_map<TAssetType*, AssetEntry*> _assetsByPointer;
// List of assets that are being considered for deletion, in the order that
// they became unused.
CesiumUtility::DoublyLinkedList<AssetEntry, &AssetEntry::deletionListPointers>
_deletionCandidates;
// The total amount of memory used by all assets in the _deletionCandidates
// list.
int64_t _totalDeletionCandidateMemoryUsage;
// Mutex serializing access to _assets, _assetsByPointer, _deletionCandidates,
// and any AssetEntry owned by this depot.
mutable std::mutex _mutex;
// The factory used to create new AssetType instances.
std::function<FactorySignature> _factory;
// This instance keeps a reference to itself whenever it is managing active
// assets, preventing it from being destroyed even if all other references to
// it are dropped.
CesiumUtility::IntrusivePointer<SharedAssetDepot<TAssetType, TAssetKey>>
_pKeepAlive;
};
template <typename TAssetType, typename TAssetKey>
SharedAssetDepot<TAssetType, TAssetKey>::SharedAssetDepot(
std::function<FactorySignature> factory)
: _assets(),
_assetsByPointer(),
_deletionCandidates(),
_totalDeletionCandidateMemoryUsage(0),
_mutex(),
_factory(std::move(factory)),
_pKeepAlive(nullptr) {}
template <typename TAssetType, typename TAssetKey>
SharedAssetDepot<TAssetType, TAssetKey>::~SharedAssetDepot() {
// Ideally, when the depot is destroyed, all the assets it owns would become
// independent assets. But this is extremely difficult to manage in a
// thread-safe manner.
// Since we're in the destructor, we can be sure no one has a reference to
// this instance anymore. That means that no other thread can be executing
// `getOrCreate`, and no async asset creations are in progress.
// However, if assets owned by this depot are still alive, then other
// threads can still be calling addReference / releaseReference on some of
// our assets even while we're running the depot's destructor. Which means
// that we can end up in `markDeletionCandidate` at the same time the
// destructor is running. And in fact it's possible for a `SharedAsset` with
// especially poor timing to call into a `SharedAssetDepot` just after it is
// destroyed.
// To avoid this, we use the _pKeepAlive field to maintain an artificial
// reference to this depot whenever it owns live assets. This should keep
// this destructor from being called except when all of its assets are also
// in the _deletionCandidates list.
CESIUM_ASSERT(this->_assets.size() == this->_deletionCandidates.size());
}
template <typename TAssetType, typename TAssetKey>
SharedFuture<CesiumUtility::ResultPointer<TAssetType>>
SharedAssetDepot<TAssetType, TAssetKey>::getOrCreate(
const AsyncSystem& asyncSystem,
const std::shared_ptr<IAssetAccessor>& pAssetAccessor,
const TAssetKey& assetKey) {
// We need to take care here to avoid two assets starting to load before the
// first asset has added an entry and set its maybePendingAsset field.
LockHolder lock = this->lock();
auto existingIt = this->_assets.find(assetKey);
if (existingIt != this->_assets.end()) {
// We've already loaded (or are loading) an asset with this ID - we can
// just use that.
const AssetEntry& entry = *existingIt->second;
if (entry.maybePendingAsset) {
// Asset is currently loading.
return *entry.maybePendingAsset;
} else {
return asyncSystem.createResolvedFuture(entry.toResultUnderLock())
.share();
}
}
// Calling the factory function while holding the mutex unnecessarily
// limits parallelism. It can even lead to a bug in the scenario where the
// `thenInWorkerThread` continuation is invoked immediately in the current
// thread, before `thenInWorkerThread` itself returns. That would result
// in an attempt to lock the mutex recursively, which is not allowed.
// So we jump through some hoops here to publish "this thread is working
// on it", then unlock the mutex, and _then_ actually call the factory
// function.
Promise<void> promise = asyncSystem.createPromise<void>();
// We haven't loaded or started to load this asset yet.
// Let's do that now.
CesiumUtility::IntrusivePointer<SharedAssetDepot<TAssetType, TAssetKey>>
pDepot = this;
CesiumUtility::IntrusivePointer<AssetEntry> pEntry = new AssetEntry(assetKey);
auto future =
promise.getFuture()
.thenImmediately([pDepot, pEntry, asyncSystem, pAssetAccessor]() {
return pDepot->_factory(asyncSystem, pAssetAccessor, pEntry->key);
})
.catchImmediately([](std::exception&& e) {
return CesiumUtility::Result<
CesiumUtility::IntrusivePointer<TAssetType>>(
CesiumUtility::ErrorList::error(
std::string("Error creating asset: ") + e.what()));
})
.thenInWorkerThread(
[pDepot,
pEntry](CesiumUtility::Result<
CesiumUtility::IntrusivePointer<TAssetType>>&& result) {
LockHolder lock = pDepot->lock();
if (result.pValue) {
result.pValue->_pDepot = pDepot.get();
pDepot->_assetsByPointer[result.pValue.get()] = pEntry.get();
}
// Now that this asset is owned by the depot, we exclusively
// control its lifetime with a std::unique_ptr.
pEntry->pAsset =
std::unique_ptr<TAssetType>(result.pValue.get());
pEntry->errorsAndWarnings = std::move(result.errors);
pEntry->maybePendingAsset.reset();
// The asset is initially live because we have an
// IntrusivePointer to it right here. So make sure the depot
// stays alive, too.
pDepot->_pKeepAlive = pDepot;
return pEntry->toResultUnderLock();
});
SharedFuture<CesiumUtility::ResultPointer<TAssetType>> sharedFuture =
std::move(future).share();
pEntry->maybePendingAsset = sharedFuture;
[[maybe_unused]] bool added = this->_assets.emplace(assetKey, pEntry).second;
// Should always be added successfully, because we checked above that the
// asset key doesn't exist in the map yet.
CESIUM_ASSERT(added);
// Unlock the mutex and then call the factory function.
lock.unlock();
promise.resolve();
return sharedFuture;
}
template <typename TAssetType, typename TAssetKey>
size_t SharedAssetDepot<TAssetType, TAssetKey>::getAssetCount() const {
LockHolder lock = this->lock();
return this->_assets.size();
}
template <typename TAssetType, typename TAssetKey>
size_t SharedAssetDepot<TAssetType, TAssetKey>::getActiveAssetCount() const {
LockHolder lock = this->lock();
return this->_assets.size() - this->_deletionCandidates.size();
}
template <typename TAssetType, typename TAssetKey>
size_t SharedAssetDepot<TAssetType, TAssetKey>::getInactiveAssetCount() const {
LockHolder lock = this->lock();
return this->_deletionCandidates.size();
}
template <typename TAssetType, typename TAssetKey>
int64_t
SharedAssetDepot<TAssetType, TAssetKey>::getInactiveAssetTotalSizeBytes()
const {
LockHolder lock = this->lock();
return this->_totalDeletionCandidateMemoryUsage;
}
template <typename TAssetType, typename TAssetKey>
typename SharedAssetDepot<TAssetType, TAssetKey>::LockHolder
SharedAssetDepot<TAssetType, TAssetKey>::lock() const {
return LockHolder{this};
}
template <typename TAssetType, typename TAssetKey>
void SharedAssetDepot<TAssetType, TAssetKey>::markDeletionCandidate(
const TAssetType& asset,
bool threadOwnsDepotLock) {
if (threadOwnsDepotLock) {
this->markDeletionCandidateUnderLock(asset);
} else {
LockHolder lock = this->lock();
this->markDeletionCandidateUnderLock(asset);
}
}
template <typename TAssetType, typename TAssetKey>
void SharedAssetDepot<TAssetType, TAssetKey>::markDeletionCandidateUnderLock(
const TAssetType& asset) {
auto it = this->_assetsByPointer.find(const_cast<TAssetType*>(&asset));
CESIUM_ASSERT(it != this->_assetsByPointer.end());
if (it == this->_assetsByPointer.end()) {
return;
}
CESIUM_ASSERT(it->second != nullptr);
AssetEntry& entry = *it->second;
entry.sizeInDeletionList = asset.getSizeBytes();
this->_totalDeletionCandidateMemoryUsage += entry.sizeInDeletionList;
this->_deletionCandidates.insertAtTail(entry);
if (this->_totalDeletionCandidateMemoryUsage >
this->inactiveAssetSizeLimitBytes) {
// Delete the deletion candidates until we're below the limit.
while (this->_deletionCandidates.size() > 0 &&
this->_totalDeletionCandidateMemoryUsage >
this->inactiveAssetSizeLimitBytes) {
AssetEntry* pOldEntry = this->_deletionCandidates.head();
this->_deletionCandidates.remove(*pOldEntry);
this->_totalDeletionCandidateMemoryUsage -= pOldEntry->sizeInDeletionList;
CESIUM_ASSERT(
pOldEntry->pAsset == nullptr ||
pOldEntry->pAsset->_referenceCount == 0);
if (pOldEntry->pAsset) {
this->_assetsByPointer.erase(pOldEntry->pAsset.get());
}
// This will actually delete the asset.
this->_assets.erase(pOldEntry->key);
}
}
// If this depot is not managing any live assets, then we no longer need to
// keep it alive.
if (this->_assets.size() == this->_deletionCandidates.size()) {
this->_pKeepAlive.reset();
}
}
template <typename TAssetType, typename TAssetKey>
void SharedAssetDepot<TAssetType, TAssetKey>::unmarkDeletionCandidate(
const TAssetType& asset,
bool threadOwnsDepotLock) {
if (threadOwnsDepotLock) {
this->unmarkDeletionCandidateUnderLock(asset);
} else {
LockHolder lock = this->lock();
this->unmarkDeletionCandidateUnderLock(asset);
}
}
template <typename TAssetType, typename TAssetKey>
void SharedAssetDepot<TAssetType, TAssetKey>::unmarkDeletionCandidateUnderLock(
const TAssetType& asset) {
auto it = this->_assetsByPointer.find(const_cast<TAssetType*>(&asset));
CESIUM_ASSERT(it != this->_assetsByPointer.end());
if (it == this->_assetsByPointer.end()) {
return;
}
CESIUM_ASSERT(it->second != nullptr);
AssetEntry& entry = *it->second;
bool isFound = this->_deletionCandidates.contains(entry);
CESIUM_ASSERT(isFound);
if (isFound) {
this->_totalDeletionCandidateMemoryUsage -= entry.sizeInDeletionList;
this->_deletionCandidates.remove(entry);
}
// This depot is now managing at least one live asset, so keep it alive.
this->_pKeepAlive = this;
}
template <typename TAssetType, typename TAssetKey>
CesiumUtility::ResultPointer<TAssetType>
SharedAssetDepot<TAssetType, TAssetKey>::AssetEntry::toResultUnderLock() const {
// This method is called while the calling thread already owns the depot
// mutex. So we must take care not to lock it again, which could happen if
// the asset is currently unreferenced and we naively create an
// IntrusivePointer for it.
CesiumUtility::IntrusivePointer<TAssetType> p = nullptr;
if (pAsset) {
pAsset->addReference(true);
p = pAsset.get();
pAsset->releaseReference(true);
}
return CesiumUtility::ResultPointer<TAssetType>(p, errorsAndWarnings);
}
template <typename TAssetType, typename TAssetKey>
SharedAssetDepot<TAssetType, TAssetKey>::LockHolder::LockHolder(
const CesiumUtility::IntrusivePointer<const SharedAssetDepot>& pDepot_)
: pDepot(pDepot_), lock(pDepot_->_mutex) {}
template <typename TAssetType, typename TAssetKey>
SharedAssetDepot<TAssetType, TAssetKey>::LockHolder::~LockHolder() = default;
template <typename TAssetType, typename TAssetKey>
void SharedAssetDepot<TAssetType, TAssetKey>::LockHolder::unlock() {
this->lock.unlock();
}
} // namespace CesiumAsync

View File

@ -0,0 +1,345 @@
#pragma once
#include "Impl/AsyncSystemSchedulers.h"
#include "Impl/CatchFunction.h"
#include "Impl/ContinuationFutureType.h"
#include "Impl/WithTracing.h"
#include "ThreadPool.h"
#include <CesiumUtility/Tracing.h>
#include <type_traits>
#include <variant>
namespace CesiumAsync {
namespace CesiumImpl {
template <typename R> struct ParameterizedTaskUnwrapper;
struct TaskUnwrapper;
} // namespace CesiumImpl
/**
* @brief A value that will be available in the future, as produced by
* {@link AsyncSystem}. Unlike {@link Future}, a `SharedFuture` allows
* multiple continuations to be attached, and allows {@link SharedFuture::wait}
* to be called multiple times.
*
* @tparam T The type of the value.
*/
template <typename T> class SharedFuture final {
public:
/**
* @brief Registers a continuation function to be invoked in a worker thread
* when this Future resolves.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* If this Future is resolved from a designated worker thread, the
* continuation function will be invoked immediately rather than in a
* separate task. Similarly, if the Future is already resolved when
* `thenInWorkerThread` is called from a designated worker thread, the
* continuation function will be invoked immediately before this
* method returns.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, T> thenInWorkerThread(Func&& f) {
return this->thenWithScheduler(
this->_pSchedulers->workerThread.immediate,
"waiting for worker thread",
std::forward<Func>(f));
}
/**
* @brief Registers a continuation function to be invoked in the main thread
* when this Future resolves.
*
* If this Future is resolved from the main thread, the
* continuation function will be invoked immediately rather than queued for
* later execution in the main thread. Similarly, if the Future is already
* resolved when `thenInMainThread` is called from the main thread, the
* continuation function will be invoked immediately before this
* method returns.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, T> thenInMainThread(Func&& f) {
return this->thenWithScheduler(
this->_pSchedulers->mainThread.immediate,
"waiting for main thread",
std::forward<Func>(f));
}
/**
* @brief Registers a continuation function to be invoked immediately in
* whichever thread causes the Future to be resolved.
*
* If the Future is already resolved, the supplied function will be called
* immediately in the calling thread and this method will not return until
* that function does.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, T> thenImmediately(Func&& f) {
return CesiumImpl::ContinuationFutureType_t<Func, T>(
this->_pSchedulers,
_task.then(
async::inline_scheduler(),
CesiumImpl::WithTracingShared<T>::end(
nullptr,
std::forward<Func>(f))));
}
/**
* @brief Registers a continuation function to be invoked in a thread pool
* when this Future resolves.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* If this Future is resolved from a thread pool thread, the
* continuation function will be invoked immediately rather than in a
* separate task. Similarly, if the Future is already resolved when
* `thenInThreadPool` is called from a designated thread pool thread, the
* continuation function will be invoked immediately before this
* method returns.
*
* @tparam Func The type of the function.
* @param threadPool The thread pool where this function will be invoked.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func>
CesiumImpl::ContinuationFutureType_t<Func, T>
thenInThreadPool(const ThreadPool& threadPool, Func&& f) {
return this->thenWithScheduler(
threadPool._pScheduler->immediate,
"waiting for thread pool thread",
std::forward<Func>(f));
}
/**
* @brief Registers a continuation function to be invoked in the main thread
* when this Future rejects.
*
* If this Future is rejected from the main thread, the
* continuation function will be invoked immediately rather than queued for
* later execution in the main thread. Similarly, if the Future is already
* rejected when `catchInMainThread` is called from the main thread, the
* continuation function will be invoked immediately before this
* method returns.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* Any `then` continuations chained after this one will be invoked with the
* return value of the catch callback.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func> Future<T> catchInMainThread(Func&& f) {
return this->catchWithScheduler(
this->_pSchedulers->mainThread.immediate,
std::forward<Func>(f));
}
/**
* @brief Registers a continuation function to be invoked immediately, and
* invalidates this Future.
*
* When this Future is rejected, the continuation function will be invoked
* in whatever thread does the rejection. Similarly, if the Future is already
* rejected when `catchImmediately` is called, the continuation function will
* be invoked immediately before this method returns.
*
* If the function itself returns a `Future`, the function will not be
* considered complete until that returned `Future` also resolves.
*
* Any `then` continuations chained after this one will be invoked with the
* return value of the catch callback.
*
* @tparam Func The type of the function.
* @param f The function.
* @return A future that resolves after the supplied function completes.
*/
template <typename Func> Future<T> catchImmediately(Func&& f) {
return this->catchWithScheduler(
async::inline_scheduler(),
std::forward<Func>(f));
}
/**
* @brief Passes through one or more additional values to the next
* continuation.
*
* The next continuation will receive a tuple with each of the provided
* values, followed by the result of the current Future.
*
* @tparam TPassThrough The types to pass through to the next continuation.
* @param values The values to pass through to the next continuation.
* @return A new Future that resolves to a tuple with the pass-through values,
* followed by the result of the last Future.
*/
template <typename... TPassThrough>
Future<std::tuple<TPassThrough..., T>>
thenPassThrough(TPassThrough&&... values) {
return this->thenImmediately(
[values = std::tuple(std::forward<TPassThrough>(values)...)](
const T& result) mutable {
return std::tuple_cat(std::move(values), std::make_tuple(result));
});
}
/**
* @brief Waits for the future to resolve or reject and returns the result.
*
* \attention This method must not be called from the main thread, the one
* that calls {@link AsyncSystem::dispatchMainThreadTasks}. Doing so can lead to a
* deadlock because the main thread tasks will never complete while this
* method is blocking the main thread.
*
* To wait in the main thread, use {@link waitInMainThread} instead.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
template <
typename U = T,
std::enable_if_t<std::is_same_v<U, T>, int> = 0,
std::enable_if_t<!std::is_same_v<U, void>, int> = 0>
const U& wait() const {
return this->_task.get();
}
/**
* @brief Waits for the future to resolve or reject.
*
* \attention This method must not be called from the main thread, the one
* that calls {@link AsyncSystem::dispatchMainThreadTasks}. Doing so can lead to a
* deadlock because the main thread tasks will never complete while this
* method is blocking the main thread.
*
* To wait in the main thread, use {@link waitInMainThread} instead.
*
* @throws An exception if the future rejected.
*/
template <
typename U = T,
std::enable_if_t<std::is_same_v<U, T>, int> = 0,
std::enable_if_t<std::is_same_v<U, void>, int> = 0>
void wait() const {
this->_task.get();
}
/**
* @brief Waits for this future to resolve or reject in the main thread while
* also processing main-thread tasks.
*
* This method must be called from the main thread.
*
* The function does not return until {@link Future::isReady} returns true.
* In the meantime, main-thread tasks are processed as necessary. This method
* does not spin wait; it suspends the calling thread by waiting on a
* condition variable when there is no work to do.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
T waitInMainThread() {
return this->_pSchedulers->mainThread.dispatchUntilTaskCompletes(
std::move(this->_task));
}
/**
* @brief Determines if this future is already resolved or rejected.
*
* If this method returns true, it is guaranteed that {@link wait} will
* not block but will instead immediately return a value or throw an
* exception.
*
* @return True if the future is already resolved or rejected and {@link wait}
* will not block; otherwise, false.
*/
bool isReady() const { return this->_task.ready(); }
private:
SharedFuture(
const std::shared_ptr<CesiumImpl::AsyncSystemSchedulers>& pSchedulers,
async::shared_task<T>&& task) noexcept
: _pSchedulers(pSchedulers), _task(std::move(task)) {}
template <typename Func, typename Scheduler>
CesiumImpl::ContinuationFutureType_t<Func, T>
thenWithScheduler(Scheduler& scheduler, const char* tracingName, Func&& f) {
// It would be nice if tracingName were a template parameter instead of a
// function parameter, but that triggers a bug in VS2017. It was previously
// a bug in VS2019, too, but has been fixed there:
// https://developercommunity.visualstudio.com/t/internal-compiler-error-when-compiling-a-template-1/534210
#if CESIUM_TRACING_ENABLED
// When tracing is enabled, we measure the time between scheduling and
// dispatching of the work.
auto task = this->_task.then(
async::inline_scheduler(),
CesiumImpl::WithTracingShared<T>::begin(
tracingName,
std::forward<Func>(f)));
#else
auto& task = this->_task;
#endif
return CesiumImpl::ContinuationFutureType_t<Func, T>(
this->_pSchedulers,
task.then(
scheduler,
CesiumImpl::WithTracingShared<T>::end(
tracingName,
std::forward<Func>(f))));
}
template <typename Func, typename Scheduler>
CesiumImpl::ContinuationFutureType_t<Func, std::exception>
catchWithScheduler(Scheduler& scheduler, Func&& f) {
return CesiumImpl::ContinuationFutureType_t<Func, std::exception>(
this->_pSchedulers,
this->_task.then(
async::inline_scheduler(),
CesiumImpl::
CatchFunction<Func, T, Scheduler, const async::shared_task<T>&>{
scheduler,
std::forward<Func>(f)}));
}
std::shared_ptr<CesiumImpl::AsyncSystemSchedulers> _pSchedulers;
async::shared_task<T> _task;
friend class AsyncSystem;
template <typename R> friend struct CesiumImpl::ParameterizedTaskUnwrapper;
friend struct CesiumImpl::TaskUnwrapper;
template <typename R> friend class Future;
template <typename R> friend class SharedFuture;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,64 @@
#pragma once
#include "ICacheDatabase.h"
#include <spdlog/fwd.h>
#include <cstddef>
#include <memory>
#include <optional>
#include <string>
namespace CesiumAsync {
/**
* @brief Cache storage using SQLITE to store completed response.
*/
class CESIUMASYNC_API SqliteCache : public ICacheDatabase {
public:
/**
* @brief Constructs a new instance with a given `databaseName` pointing to a
* database.
*
* The instance will connect to the existing database or create a new one if
* it doesn't exist
*
* @param pLogger The logger that receives error messages.
* @param databaseName the database path.
* @param maxItems the maximum number of items should be kept in the database
* after prunning.
*/
SqliteCache(
const std::shared_ptr<spdlog::logger>& pLogger,
const std::string& databaseName,
uint64_t maxItems = 4096);
~SqliteCache();
/** @copydoc ICacheDatabase::getEntry*/
virtual std::optional<CacheItem>
getEntry(const std::string& key) const override;
/** @copydoc ICacheDatabase::storeEntry*/
virtual bool storeEntry(
const std::string& key,
std::time_t expiryTime,
const std::string& url,
const std::string& requestMethod,
const HttpHeaders& requestHeaders,
uint16_t statusCode,
const HttpHeaders& responseHeaders,
const std::span<const std::byte>& responseData) override;
/** @copydoc ICacheDatabase::prune*/
virtual bool prune() override;
/** @copydoc ICacheDatabase::clearAll*/
virtual bool clearAll() override;
private:
struct Impl;
std::unique_ptr<Impl> _pImpl;
void createConnection() const;
void destroyDatabase();
};
} // namespace CesiumAsync

View File

@ -0,0 +1,62 @@
#pragma once
#include "cesium-sqlite3.h"
#include <memory>
#include <string>
struct CESIUM_SQLITE(sqlite3);
struct CESIUM_SQLITE(sqlite3_stmt);
namespace CesiumAsync {
/**
* @brief A deleter that can be used with `std::unique_ptr` to properly destroy
* a SQLite connection when it is no longer needed.
*/
struct DeleteSqliteConnection {
/** @brief Closes the provided sqlite connection when called. */
void operator()(CESIUM_SQLITE(sqlite3*) pConnection) noexcept;
};
/**
* @brief A deleter that can be used with `std::unique_ptr` to properly destroy
* a SQLite prepared statement when it is no longer needed.
*/
struct DeleteSqliteStatement {
/** @brief Finalizes the provided sqlite statement when called. */
void operator()(CESIUM_SQLITE(sqlite3_stmt*) pStatement) noexcept;
};
/**
* @brief A `std::unique_ptr<sqlite3>` that will properly delete the connection
* using the SQLite API.
*/
using SqliteConnectionPtr =
std::unique_ptr<CESIUM_SQLITE(sqlite3), DeleteSqliteConnection>;
/**
* @brief A `std::unique_ptr<sqlite3_stmt>` that will properly delete the
* statement using the SQLite API.
*/
using SqliteStatementPtr =
std::unique_ptr<CESIUM_SQLITE(sqlite3_stmt), DeleteSqliteStatement>;
/**
* @brief Helper functions for working with SQLite.
*/
struct SqliteHelper {
/**
* @brief Create a prepared statement.
*
* @param pConnection The SQLite connection in which to create the prepared
* statement.
* @param sql The SQL text for the statement.
* @return The created prepared statement.
*/
static SqliteStatementPtr prepareStatement(
const SqliteConnectionPtr& pConnection,
const std::string& sql);
};
} // namespace CesiumAsync

View File

@ -0,0 +1,56 @@
#pragma once
#include "Impl/ImmediateScheduler.h"
#include "Impl/cesium-async++.h"
#include "Library.h"
#include <memory>
namespace CesiumAsync {
/**
* @brief A thread pool created by {@link AsyncSystem::createThreadPool}.
*
* This object has no public methods, but can be used with
* {@link AsyncSystem::runInThreadPool} and
* {@link Future::thenInThreadPool}.
*/
class CESIUMASYNC_API ThreadPool {
public:
/**
* @brief Creates a new thread pool with the given number of threads.
*
* @param numberOfThreads The number of threads to create in this ThreadPool.
*/
ThreadPool(int32_t numberOfThreads);
private:
struct Scheduler {
Scheduler(int32_t numberOfThreads);
void schedule(async::task_run_handle t);
CesiumImpl::ImmediateScheduler<Scheduler> immediate{this};
async::threadpool_scheduler scheduler;
};
static auto createPreRun(ThreadPool::Scheduler* pScheduler) {
return
[pScheduler]() { ThreadPool::_scope = pScheduler->immediate.scope(); };
}
static auto createPostRun() noexcept {
return []() noexcept { ThreadPool::_scope.reset(); };
}
static thread_local CesiumImpl::ImmediateScheduler<Scheduler>::SchedulerScope
_scope;
std::shared_ptr<Scheduler> _pScheduler;
template <typename T> friend class Future;
template <typename T> friend class SharedFuture;
friend class AsyncSystem;
};
} // namespace CesiumAsync

View File

@ -0,0 +1,7 @@
#pragma once
#if PRIVATE_CESIUM_SQLITE
#define CESIUM_SQLITE(name) cesium_##name
#else
#define CESIUM_SQLITE(name) name
#endif