1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
use bytes::{BufMut, Bytes, BytesMut};

use tokio_stream::StreamExt;

use std::future::Future;
use std::ptr::{self, NonNull};
use std::sync::Arc;

use crate::database::DatabaseOption;
use crate::error::{check, FdbError, FdbResult};
use crate::range::{Range, RangeOptions};
use crate::transaction::{
    FdbReadTransaction, FdbTransaction, ReadTransaction, Transaction, TransactionOption,
};
use crate::Key;

#[cfg(feature = "fdb-7_1")]
use std::convert::TryInto;

#[cfg(feature = "fdb-7_1")]
use crate::Tenant;

#[cfg(feature = "fdb-7_1")]
use crate::tenant::FdbTenant;

/// A mutable, lexicographically ordered mapping from binary keys to
/// binary values.
///
/// [`FdbTransaction`]s are used to manipulate data within a single
/// [`FdbDatabase`] - multiple concurrent [`FdbTransaction`]s on a
/// [`FdbDatabase`] enforce **ACID** properties.
///
/// The simplest correct programs using FDB will make use of the
/// [`run`] and [`read`] methods. [`run`] will call [`commit`] after
/// the user code has been executed.
///
/// A handle to FDB database. All reads and writes to the database are
/// transactional.
///
/// A [`FdbDatabase`] can be created using [`open_database`] function.
///
/// [`commit`]: FdbTransaction::commit
/// [`read`]: FdbDatabase::read
/// [`run`]: FdbDatabase::run
/// [`open_database`]: crate::open_database
//
// *NOTE*: If you make changes to this type, make sure you update
//         tests for `DummyFdbDatabase`, `DropTestDummyFdbDatabase`
//         accordingly.
#[derive(Clone, Debug)]
pub struct FdbDatabase {
    c_ptr: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>,
}

impl FdbDatabase {
    // In Java following method is on `Interface Database`.

    /// Creates a [`FdbTransaction`] that operates on this
    /// [`FdbDatabase`].
    pub fn create_transaction(&self) -> FdbResult<FdbTransaction> {
        let mut ptr: *mut fdb_sys::FDB_transaction = ptr::null_mut();
        // Safety: It is safe to unwrap here because if we have given
        // out an `FdbDatabase` then `c_ptr` *must* be
        // `Some<Arc<...>>`.
        check(unsafe {
            fdb_sys::fdb_database_create_transaction(
                (*(self.c_ptr.as_ref().unwrap())).as_ptr(),
                &mut ptr,
            )
        })
        .map(|_| {
            FdbTransaction::new(Some(Arc::new(NonNull::new(ptr).expect(
                "fdb_database_create_transaction returned null, but did not return an error",
            ))))
        })
    }

    /// Returns an array of [`Key`]s `k` such that `begin <= k < end`
    /// and `k` is located at the start of contiguous range stored on
    /// a single server.
    ///
    /// If `limit` is non-zero, only the first `limit` number of keys
    /// will be returned. In large databases, the number of boundary
    /// keys may be large. In these cases, a non-zero `limit` should
    /// be used, along with multiple calls to [`get_boundary_keys`].
    ///
    /// If `read_version` is non-zero, the boundary keys as of
    /// `read_version` will be returned.
    ///
    /// This method is not transactional.
    ///
    /// [`get_boundary_keys`]: FdbDatabase::get_boundary_keys
    pub async fn get_boundary_keys(
        &self,
        begin: impl Into<Key>,
        end: impl Into<Key>,
        limit: i32,
        read_version: i64,
    ) -> FdbResult<Vec<Key>> {
        let tr = self.create_transaction()?;

        if read_version != 0 {
            unsafe {
                tr.set_read_version(read_version);
            }
        }

        tr.set_option(TransactionOption::ReadSystemKeys)?;
        tr.set_option(TransactionOption::LockAware)?;

        let range = Range::new(
            {
                let mut b = BytesMut::new();
                b.put(&b"\xFF/keyServers/"[..]);
                b.put(Into::<Bytes>::into(begin.into()));
                Into::<Bytes>::into(b)
            },
            {
                let mut b = BytesMut::new();
                b.put(&b"\xFF/keyServers/"[..]);
                b.put(Into::<Bytes>::into(end.into()));
                Into::<Bytes>::into(b)
            },
        );

        let mut res = Vec::new();

        let mut range_stream = range.into_stream(&tr.snapshot(), {
            let mut ro = RangeOptions::default();
            ro.set_limit(limit);
            ro
        });

        while let Some(x) = range_stream.next().await {
            let kv = x?;
            res.push({
                // `13` because that is the length of
                // `"\xFF/keyServers/"`.
                Into::<Key>::into(Into::<Bytes>::into(kv.into_key()).slice(13..))
            });
        }

        Ok(res)
    }

    #[cfg(feature = "fdb-7_1")]
    /// Opens an existing tenant to be used for running transactions.
    ///
    /// **Note:** Opening a tenant does not check its existence in the
    /// cluster.
    pub fn open_tenant(&self, tenant_name: impl Into<Tenant>) -> FdbResult<FdbTenant> {
        let t = Bytes::from(tenant_name.into());
        let tenant_name = t.as_ref().as_ptr();
        let tenant_name_length = t.as_ref().len().try_into().unwrap();

        let mut ptr: *mut fdb_sys::FDB_tenant = ptr::null_mut();

        // Safety: It is safe to unwrap here because if we have given
        // out an `FdbDatabase` then `c_ptr` *must* be
        // `Some<Arc<...>>`.
        check(unsafe {
            fdb_sys::fdb_database_open_tenant(
                (*(self.c_ptr.as_ref().unwrap())).as_ptr(),
                tenant_name,
                tenant_name_length,
                &mut ptr,
            )
        })
        .map(|_| {
            FdbTenant::new(
                Some(Arc::new(NonNull::new(ptr).expect(
                    "fdb_database_open_tenant returned null, but did not return an error",
                ))),
                t.into(),
            )
        })
    }

    // In Java following method is on `Interface TransactionContext`.

    /// Runs a closure in the context that takes a [`FdbTransaction`].
    ///
    /// # Note
    ///
    /// The closure `FnMut: FnMut(FdbTransaction) -> Fut` will run
    /// multiple times (retry) when certain errors are
    /// encountered. Therefore the closure should be prepared to be
    /// called more than once. This consideration means that the
    /// closure should use caution when modifying state.
    pub async fn run<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
    where
        F: FnMut(FdbTransaction) -> Fut,
        Fut: Future<Output = FdbResult<T>>,
    {
        let t = self.create_transaction()?;

        loop {
            let ret_val = f(t.clone()).await;

            // Closure returned an error
            if let Err(e) = ret_val {
                if FdbError::layer_error(e.code()) {
                    // Check if it is a layer error. If so, just
                    // return it.
                    return Err(e);
                } else if let Err(e1) = unsafe { t.on_error(e) }.await {
                    // Check if `on_error` returned an error. This
                    // means we have a non-retryable error.
                    return Err(e1);
                } else {
                    continue;
                }
            }

            // No error from closure. Attempt to commit the
            // transaction.
            if let Err(e) = unsafe { t.commit() }.await {
                // Commit returned an error
                if let Err(e1) = unsafe { t.on_error(e) }.await {
                    // Check if `on_error` returned an error. This
                    // means we have a non-retryable error.
                    return Err(e1);
                } else {
                    continue;
                }
            }

            // Commit successful, return `Ok(T)`
            return ret_val;
        }
    }

    // In Java following method is on `Interface
    // ReadTransactionContext`.

    /// Runs a closure in the context that takes a
    /// [`FdbReadTransaction`].
    ///
    /// # Note
    ///
    /// The closure `F: FnMut(FdbReadTransaction) -> Fut` will run
    /// multiple times (retry) when certain errors are
    /// encountered. Therefore the closure should be prepared to be
    /// called more than once. This consideration means that the
    /// closure should use caution when modifying state.
    //
    // It is okay to for `F` to have the signature
    // `FnMut(FdbReadTransaction) -> Fut` because we are not allowing
    // any mutations to occur. We are only concerned about retrying in
    // case of retryable errors.
    pub async fn read<T, F, Fut>(&self, mut f: F) -> FdbResult<T>
    where
        F: FnMut(FdbReadTransaction) -> Fut,
        Fut: Future<Output = FdbResult<T>>,
    {
        let t = self.create_transaction()?.snapshot();
        loop {
            let ret_val = f(t.clone()).await;

            // Closure returned an error
            if let Err(e) = ret_val {
                if FdbError::layer_error(e.code()) {
                    // Check if it is a layer error. If so, just
                    // return it.
                    return Err(e);
                } else if let Err(e1) = unsafe { t.on_error(e) }.await {
                    // Check if `on_error` returned an error. This
                    // means we have a non-retryable error.
                    return Err(e1);
                } else {
                    continue;
                }
            }

            // We don't need to commit read transaction, return
            // `Ok(T)`
            return ret_val;
        }
    }

    /// Set options on a [`FdbDatabase`].
    pub fn set_option(&self, option: DatabaseOption) -> FdbResult<()> {
        // Safety: It is safe to unwrap here because if we have given
        // out an `FdbDatabase` then `c_ptr` *must* be
        // `Some<Arc<...>>`.
        unsafe { option.apply((self.c_ptr.as_ref().unwrap()).as_ptr()) }
    }

    pub(crate) fn new(c_ptr: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>) -> FdbDatabase {
        FdbDatabase { c_ptr }
    }
}

impl Drop for FdbDatabase {
    fn drop(&mut self) {
        if let Some(a) = self.c_ptr.take() {
            match Arc::try_unwrap(a) {
                Ok(a) => unsafe {
                    fdb_sys::fdb_database_destroy(a.as_ptr());
                },
                Err(at) => {
                    drop(at);
                }
            };
        }
    }
}

// # Safety
//
// After `FdbDatabase` is created, `NonNull<fdb_sys::FDBDatabase>` is
// accessed read-only, till it is finally dropped.
//
// Due to the use of `Arc`, copies are carefully managed, and
// `Drop::drop` calls `fdb_sys::fdb_database_destroy`, when the last
// copy of the `Arc` pointer is dropped.
//
// Other than `Drop::drop` (where we already ensure exclusive access),
// we don't have any mutable state inside `FdbDatabase` that needs to
// be protected with exclusive access. This allows us to add the
// `Send` trait.
//
// `FdbDatabase` is read-only, *without* interior mutability, it is
// safe to add `Sync` trait.
//
// The main reason for adding `Send` and `Sync` traits is so that
// values of `FdbDatabase` can be moved to other threads.
unsafe impl Send for FdbDatabase {}
unsafe impl Sync for FdbDatabase {}

#[cfg(test)]
mod tests {
    use impls::impls;

    use std::ptr::NonNull;
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;

    use super::FdbDatabase;

    #[test]
    fn impls() {
        #[rustfmt::skip]
	assert!(impls!(
	    FdbDatabase:
	        Send &
		Sync &
		Clone &
		!Copy));
    }

    #[allow(dead_code)]
    #[derive(Clone, Debug)]
    struct DummyFdbDatabase {
        c_ptr: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>,
    }

    unsafe impl Send for DummyFdbDatabase {}
    unsafe impl Sync for DummyFdbDatabase {}

    #[test]
    fn trait_bounds() {
        fn trait_bounds_for_fdb_database<T>(_t: T)
        where
            T: Send + Sync + 'static,
        {
        }
        let d = DummyFdbDatabase {
            c_ptr: Some(Arc::new(NonNull::dangling())),
        };
        trait_bounds_for_fdb_database(d);
    }

    static mut DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED: AtomicBool = AtomicBool::new(false);

    #[derive(Clone, Debug)]
    struct DropTestDummyFdbDatabase {
        c_ptr: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>,
    }

    unsafe impl Send for DropTestDummyFdbDatabase {}
    unsafe impl Sync for DropTestDummyFdbDatabase {}

    impl Drop for DropTestDummyFdbDatabase {
        fn drop(&mut self) {
            if let Some(a) = self.c_ptr.take() {
                match Arc::try_unwrap(a) {
                    Ok(_) => {
                        unsafe {
                            DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.store(true, Ordering::SeqCst);
                        };
                    }
                    Err(at) => {
                        drop(at);
                    }
                };
            }
        }
    }

    #[tokio::test]
    async fn multiple_drop() {
        let d0 = DropTestDummyFdbDatabase {
            c_ptr: Some(Arc::new(NonNull::dangling())),
        };

        // Initially this is false.
        assert!(!unsafe { DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.load(Ordering::SeqCst) });

        let d1 = d0.clone();

        assert_eq!(Arc::strong_count(d1.c_ptr.as_ref().unwrap()), 2);

        tokio::spawn(async move {
            let _ = d1;
        })
        .await
        .unwrap();

        assert_eq!(Arc::strong_count(d0.c_ptr.as_ref().unwrap()), 1);

        let d2 = d0.clone();
        let d3 = d2.clone();

        tokio::spawn(async move {
            let _ = d2;
            let _ = d3;
        })
        .await
        .unwrap();

        assert_eq!(Arc::strong_count(d0.c_ptr.as_ref().unwrap()), 1);

        drop(d0);

        assert!(unsafe { DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.load(Ordering::SeqCst) });
    }
}