Whamcloud - gitweb
New RC 2.16.0-RC5
[fs/lustre-release.git] / lustre / quota / lquota_disk.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2012, 2017, Intel Corporation.
5  * Use is subject to license terms.
6  */
7
8 /*
9  * The disk API is used by both the QMT and QSD to access/update on-disk index
10  * files. The API consists of the following functions:
11  *
12  * - lquota_disk_dir_find_create: look-up quota directory, create it if not
13  *                                found.
14  * - lquota_disk_glb_find_create: look-up global index file, create it if not
15  *                                found.
16  * - lquota_disk_slv_find:        look-up a slave index file.
17  * - lquota_disk_slv_find_create: look-up a slave index file. Allocate a FID if
18  *                                required and create the index file on disk if
19  *                                it does not exist.
20  * - lquota_disk_for_each_slv:    iterate over all existing slave index files
21  * - lquota_disk_read:            read quota settings from an index file
22  * - lquota_disk_declare_write:   reserve credits to update a record in an index
23  *                                file
24  * - lquota_disk_write:           update a record in an index file
25  * - lquota_disk_update_ver:      update version of an index file
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LQUOTA
32
33 #include <obd_class.h>
34 #include "lquota_internal.h"
35
36 #define LQUOTA_MODE (S_IFREG | S_IRUGO | S_IWUSR)
37
38 /*
39  * Helper function looking up & creating if not found an index file with a
40  * dynamic fid.
41  */
42 static struct dt_object *
43 lquota_disk_find_create(const struct lu_env *env, struct dt_device *dev,
44                         struct dt_object *parent, struct lu_fid *fid,
45                         const struct dt_index_features *idx_feat,
46                         char *name)
47 {
48         struct lquota_thread_info *qti = lquota_info(env);
49         struct dt_object *obj;
50         struct local_oid_storage *los;
51         int rc;
52         ENTRY;
53
54         /* Set up local storage */
55         rc = local_oid_storage_init(env, dev, fid, &los);
56         if (rc)
57                 RETURN(ERR_PTR(rc));
58
59         /* lookup/create slave index file */
60         obj = local_index_find_or_create(env, los, parent, name, LQUOTA_MODE,
61                                          idx_feat);
62         if (IS_ERR(obj))
63                 GOTO(out, obj);
64
65         /* local_oid_storage_fini() will finalize the local storage device,
66          * we have to open the object in another device stack */
67         qti->qti_fid = obj->do_lu.lo_header->loh_fid;
68         dt_object_put_nocache(env, obj);
69         obj = dt_locate(env, dev, &qti->qti_fid);
70         if (IS_ERR(obj))
71                 GOTO(out, obj);
72 out:
73         local_oid_storage_fini(env, los);
74         RETURN(obj);
75 }
76
77 /*
78  * helper function to generate the filename associated with a slave index file
79  */
80 static inline int lquota_disk_slv_filename(const struct lu_fid *glb_fid,
81                                            struct obd_uuid *uuid,
82                                            char *filename)
83 {
84         char    *name, *uuid_str;
85
86         /* In most case, the uuid is NULL terminated */
87         if (uuid->uuid[sizeof(*uuid) - 1] != '\0') {
88                 OBD_ALLOC(uuid_str, sizeof(*uuid));
89                 if (uuid_str == NULL)
90                         RETURN(-ENOMEM);
91                 memcpy(uuid_str, uuid->uuid, sizeof(*uuid) - 1);
92         } else {
93                 uuid_str = (char *)uuid->uuid;
94         }
95
96         /* we strip the slave's UUID (in the form of fsname-OST0001_UUID) of
97          * the filesystem name in case this one is changed in the future */
98         name = strrchr(uuid_str, '-');
99         if (name == NULL) {
100                 name = strrchr(uuid_str, ':');
101                 if (name == NULL) {
102                         CERROR("Failed to extract extract filesystem "
103                                "name from UUID %s\n", uuid_str);
104                         if (uuid_str != uuid->uuid)
105                                 OBD_FREE(uuid_str, sizeof(*uuid));
106                         return -EINVAL;
107                 }
108         }
109         name++;
110
111         /* the filename is composed of the most signicant bits of the global
112          * FID, that's to say the oid which encodes the pool type and
113          * quota type, followed by the export UUID */
114         sprintf(filename, "0x%x-%s", glb_fid->f_oid, name);
115
116         if (uuid_str != uuid->uuid)
117                 OBD_FREE(uuid_str, sizeof(*uuid));
118
119         return 0;
120 }
121
122 /*
123  * Set up quota directory (either "quota_master" or "quota_slave") for a QMT or
124  * QSD instance. This function is also used to create per-pool directory on
125  * the quota master.
126  * The directory is created with a local sequence if it does not exist already.
127  * This function is called at ->ldo_prepare time when the full device stack is
128  * configured.
129  *
130  * \param env  - is the environment passed by the caller
131  * \param dev  - is the dt_device where to create the quota directory
132  * \param parent  - is the parent directory. If not specified, the directory
133  *                  will be created under the root directory
134  * \param name - is the name of quota directory to be created
135  *
136  * \retval     - pointer to quota root dt_object on success, appropriate error
137  *               on failure
138  */
139 struct dt_object *lquota_disk_dir_find_create(const struct lu_env *env,
140                                               struct dt_device *dev,
141                                               struct dt_object *parent,
142                                               const char *name)
143 {
144         struct lquota_thread_info       *qti = lquota_info(env);
145         struct dt_object                *qt_dir = NULL;
146         struct local_oid_storage        *los = NULL;
147         int                              rc;
148         ENTRY;
149
150         /* Set up local storage to create the quota directory.
151          * We use the sequence reserved for local named objects */
152         lu_local_name_obj_fid(&qti->qti_fid, 1);
153         rc = local_oid_storage_init(env, dev, &qti->qti_fid, &los);
154         if (rc)
155                 RETURN(ERR_PTR(rc));
156
157         if (parent == NULL) {
158                 /* Fetch dt object associated with root directory */
159                 rc = dt_root_get(env, dev, &qti->qti_fid);
160                 if (rc)
161                         GOTO(out, rc);
162
163                 parent = dt_locate_at(env, dev, &qti->qti_fid,
164                                       dev->dd_lu_dev.ld_site->ls_top_dev, NULL);
165                 if (IS_ERR(parent))
166                         GOTO(out, rc = PTR_ERR(parent));
167         } else {
168                 lu_object_get(&parent->do_lu);
169         }
170
171         /* create quota directory to be used for all quota index files */
172         qt_dir = local_file_find_or_create(env, los, parent, name, S_IFDIR |
173                                            S_IRUGO | S_IWUSR | S_IXUGO);
174         if (IS_ERR(qt_dir))
175                 GOTO(out, rc = PTR_ERR(qt_dir));
176
177         /* local_oid_storage_fini() will finalize the local storage device,
178          * we have to open the object in another device stack */
179         qti->qti_fid = qt_dir->do_lu.lo_header->loh_fid;
180         dt_object_put_nocache(env, qt_dir);
181         qt_dir = dt_locate(env, dev, &qti->qti_fid);
182         if (IS_ERR(qt_dir))
183                 GOTO(out, rc = PTR_ERR(qt_dir));
184
185         if (!dt_try_as_dir(env, qt_dir, true))
186                 GOTO(out, rc = -ENOTDIR);
187         EXIT;
188 out:
189         if (parent != NULL && !IS_ERR(parent))
190                 dt_object_put(env, parent);
191         if (los != NULL)
192                 local_oid_storage_fini(env, los);
193         if (rc) {
194                 if (qt_dir != NULL && !IS_ERR(qt_dir))
195                         dt_object_put(env, qt_dir);
196                 qt_dir = ERR_PTR(rc);
197         }
198         return qt_dir;
199 }
200
201 /*
202  * Look-up/create a global index file.
203  *
204  * \param env - is the environment passed by the caller
205  * \parap dev - is the dt_device where to lookup/create the global index file
206  * \param parent - is the parent directory where to create the global index if
207  *                 not found
208  * \param fid - is the fid of the global index to be looked up/created
209  * \parap local - indicates whether the index should be created with a local
210  *                generated fid or with \fid
211  *
212  * \retval     - pointer to the dt_object of the global index on success,
213  *               appropriate error on failure
214  */
215 struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env,
216                                               struct dt_device *dev,
217                                               struct dt_object *parent,
218                                               struct lu_fid *fid, bool local)
219 {
220         struct lquota_thread_info       *qti = lquota_info(env);
221         struct dt_object                *glb_idx;
222         const struct dt_index_features  *idx_feat;
223         ENTRY;
224
225         CDEBUG(D_QUOTA, "look-up/create %sglobal idx file ("DFID")\n",
226                local ? "local " : "", PFID(fid));
227
228         idx_feat = &dt_quota_glb_features;
229
230         /* the filename is composed of the most signicant bits of the FID,
231          * that's to say the oid which encodes the pool type and quota type */
232         sprintf(qti->qti_buf, "0x%x", fid->f_oid);
233
234         if (local) {
235                 /* We use the sequence reserved for local named objects */
236                 lu_local_name_obj_fid(&qti->qti_fid, 1);
237                 glb_idx = lquota_disk_find_create(env, dev, parent,
238                                                   &qti->qti_fid, idx_feat,
239                                                   qti->qti_buf);
240         } else {
241                 /* look-up/create global index on disk */
242                 glb_idx = local_index_find_or_create_with_fid(env, dev, fid,
243                                                               parent,
244                                                               qti->qti_buf,
245                                                               LQUOTA_MODE,
246                                                               idx_feat);
247         }
248
249         if (IS_ERR(glb_idx)) {
250                 CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld "
251                        "local:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
252                        PFID(fid), PTR_ERR(glb_idx), local);
253                 RETURN(glb_idx);
254         }
255
256         /* install index operation vector */
257         if (glb_idx->do_index_ops == NULL) {
258                 int rc;
259
260                 rc = glb_idx->do_ops->do_index_try(env, glb_idx, idx_feat);
261                 if (rc) {
262                         CERROR("%s: failed to setup index operations for "DFID
263                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
264                                PFID(lu_object_fid(&glb_idx->do_lu)), rc);
265                         dt_object_put(env, glb_idx);
266                         glb_idx = ERR_PTR(rc);
267                 }
268         }
269
270         RETURN(glb_idx);
271 }
272
273 /*
274  * Look-up a slave index file.
275  *
276  * \param env - is the environment passed by the caller
277  * \param dev - is the backend dt_device where to look-up/create the slave index
278  * \param parent - is the parent directory where to lookup the slave index
279  * \param glb_fid - is the fid of the global index file associated with this
280  *                  slave index.
281  * \param uuid    - is the uuid of slave which is (re)connecting to the master
282  *                  target
283  *
284  * \retval     - pointer to the dt_object of the slave index on success,
285  *               appropriate error on failure
286  */
287 struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
288                                        struct dt_device *dev,
289                                        struct dt_object *parent,
290                                        const struct lu_fid *glb_fid,
291                                        struct obd_uuid *uuid)
292 {
293         struct lquota_thread_info       *qti = lquota_info(env);
294         struct dt_object                *slv_idx;
295         int                              rc;
296         ENTRY;
297
298         LASSERT(uuid != NULL);
299
300         CDEBUG(D_QUOTA, "lookup slave index file for %s\n",
301                obd_uuid2str(uuid));
302
303         /* generate filename associated with the slave */
304         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
305         if (rc)
306                 RETURN(ERR_PTR(rc));
307
308         /* lookup slave index file */
309         rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
310         if (rc)
311                 RETURN(ERR_PTR(rc));
312
313         /* name is found, get the object */
314         slv_idx = dt_locate(env, dev, &qti->qti_fid);
315         if (IS_ERR(slv_idx))
316                 RETURN(slv_idx);
317
318         if (slv_idx->do_index_ops == NULL) {
319                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
320                                                    &dt_quota_slv_features);
321                 if (rc) {
322                         CERROR("%s: failed to setup slave index operations for "
323                                "%s, rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
324                                obd_uuid2str(uuid), rc);
325                         dt_object_put(env, slv_idx);
326                         slv_idx = ERR_PTR(rc);
327                 }
328         }
329
330         RETURN(slv_idx);
331 }
332
333 /*
334  * Look-up a slave index file. If the slave index isn't found:
335  * - if local is set to false, we allocate a FID from FID_SEQ_QUOTA sequence and
336  *   create the index.
337  * - otherwise, we create the index file with a local reserved FID (see
338  *   lquota_local_oid)
339  *
340  * \param env - is the environment passed by the caller
341  * \param dev - is the backend dt_device where to look-up/create the slave index
342  * \param parent - is the parent directory where to create the slave index if
343  *                 it does not exist already
344  * \param glb_fid - is the fid of the global index file associated with this
345  *                  slave index.
346  * \param uuid    - is the uuid of slave which is (re)connecting to the master
347  *                  target
348  * \param local   - indicate whether to use local reserved FID (LQUOTA_USR_OID
349  *                  & LQUOTA_GRP_OID) for the slave index creation or to
350  *                  allocate a new fid from sequence FID_SEQ_QUOTA
351  *
352  * \retval     - pointer to the dt_object of the slave index on success,
353  *               appropriate error on failure
354  */
355 struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
356                                               struct dt_device *dev,
357                                               struct dt_object *parent,
358                                               struct lu_fid *glb_fid,
359                                               struct obd_uuid *uuid,
360                                               bool local)
361 {
362         struct lquota_thread_info       *qti = lquota_info(env);
363         struct dt_object                *slv_idx;
364         int                              rc, type;
365         ENTRY;
366
367         LASSERT(uuid != NULL);
368
369         CDEBUG(D_QUOTA, "lookup/create slave index file for %s\n",
370                obd_uuid2str(uuid));
371
372         /* generate filename associated with the slave */
373         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
374         if (rc)
375                 RETURN(ERR_PTR(rc));
376
377         if (lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev))
378                 type = LDD_F_SV_TYPE_MDT;
379         else
380                 type = LDD_F_SV_TYPE_OST;
381
382         /* Slave indexes uses the FID_SEQ_QUOTA sequence since they can be read
383          * through the network */
384         qti->qti_fid.f_seq = FID_SEQ_QUOTA;
385         qti->qti_fid.f_ver = 0;
386         if (local) {
387                 enum lquota_res_type pool_type;
388                 enum lquota_type qtype;
389
390                 rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
391                 if (rc)
392                         RETURN(ERR_PTR(rc));
393
394                 /* use predefined fid in the reserved oid list */
395                 if ((type == LDD_F_SV_TYPE_MDT && pool_type == LQUOTA_RES_MD) ||
396                     (type == LDD_F_SV_TYPE_OST && pool_type == LQUOTA_RES_DT))
397                         qti->qti_fid.f_oid = qtype2slv_oid(qtype);
398                 else
399                         qti->qti_fid.f_oid = pool_type << 16 |
400                                                         qtype2slv_oid(qtype);
401
402                 slv_idx = local_index_find_or_create_with_fid(env, dev,
403                                                               &qti->qti_fid,
404                                                               parent,
405                                                               qti->qti_buf,
406                                                               LQUOTA_MODE,
407                                                         &dt_quota_slv_features);
408         } else {
409                 /* allocate fid dynamically if index does not exist already */
410                 qti->qti_fid.f_oid = LQUOTA_GENERATED_OID;
411
412                 /* lookup/create slave index file */
413                 slv_idx = lquota_disk_find_create(env, dev, parent,
414                                                   &qti->qti_fid,
415                                                   &dt_quota_slv_features,
416                                                   qti->qti_buf);
417         }
418
419         if (IS_ERR(slv_idx))
420                 RETURN(slv_idx);
421
422         /* install index operation vector */
423         if (slv_idx->do_index_ops == NULL) {
424                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
425                                                    &dt_quota_slv_features);
426                 if (rc) {
427                         CERROR("%s: failed to setup index operations for "DFID
428                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
429                                PFID(lu_object_fid(&slv_idx->do_lu)), rc);
430                         dt_object_put(env, slv_idx);
431                         slv_idx = ERR_PTR(rc);
432                 }
433         }
434
435         RETURN(slv_idx);
436 }
437
438 /*
439  * Iterate over all slave index files associated with global index \glb_fid and
440  * invoke a callback function for each slave index file.
441  *
442  * \param env     - is the environment passed by the caller
443  * \param parent  - is the parent directory where the slave index files are
444  *                  stored
445  * \param glb_fid - is the fid of the global index file associated with the
446  *                  slave indexes to scan
447  * \param func    - is the callback function to call each time a slave index
448  *                  file is found
449  * \param arg     - is an opaq argument passed to the callback function \func
450  */
451 int lquota_disk_for_each_slv(const struct lu_env *env, struct dt_object *parent,
452                              struct lu_fid *glb_fid, lquota_disk_slv_cb_t func,
453                              void *arg)
454 {
455         struct lquota_thread_info       *qti = lquota_info(env);
456         struct dt_it                    *it;
457         const struct dt_it_ops          *iops;
458         char                            *name;
459         int                              rc;
460         ENTRY;
461
462         OBD_ALLOC(name, LQUOTA_NAME_MAX);
463         if (name == NULL)
464                 RETURN(-ENOMEM);
465
466         /* filename associated with slave index files are prefixed with the most
467          * signicant bits of the global FID */
468         sprintf(name, "0x%x-", glb_fid->f_oid);
469
470         iops = &parent->do_index_ops->dio_it;
471         it = iops->init(env, parent, 0);
472         if (IS_ERR(it)) {
473                 OBD_FREE(name, LQUOTA_NAME_MAX);
474                 RETURN(PTR_ERR(it));
475         }
476
477         rc = iops->load(env, it, 0);
478         if (rc == 0) {
479                 /*
480                  * Iterator didn't find record with exactly the key requested.
481                  *
482                  * It is currently either
483                  *
484                  *     - positioned above record with key less than
485                  *     requested---skip it.
486                  *
487                  *     - or not positioned at all (is in IAM_IT_SKEWED
488                  *     state)---position it on the next item.
489                  */
490                 rc = iops->next(env, it);
491         } else if (rc > 0)
492                 rc = 0;
493
494         while (rc == 0) {
495                 struct dt_key   *key;
496                 int              len;
497
498                 len = iops->key_size(env, it);
499                 /* IAM iterator can return record with zero len. */
500                 if (len == 0 || len <= strlen(name) || len >= LQUOTA_NAME_MAX)
501                         goto next;
502
503                 key = iops->key(env, it);
504                 if (IS_ERR(key)) {
505                         rc = PTR_ERR(key);
506                         break;
507                 }
508
509                 if (strncmp((char *)key, name, strlen(name)) != 0)
510                         goto next;
511
512                 /* ldiskfs OSD returns filename as stored in directory entry
513                  * which does not end up with '\0' */
514                 memcpy(&qti->qti_buf, key, len);
515                 qti->qti_buf[len] = '\0';
516
517                 /* lookup fid associated with this slave index file */
518                 rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
519                 if (rc)
520                         break;
521
522                 if (qti->qti_fid.f_seq != FID_SEQ_QUOTA)
523                         goto next;
524
525                 rc = func(env, glb_fid, qti->qti_buf, &qti->qti_fid, arg);
526                 if (rc)
527                         break;
528 next:
529                 do {
530                         rc = iops->next(env, it);
531                 } while (rc == -ESTALE);
532         }
533
534         iops->put(env, it);
535         iops->fini(env, it);
536         OBD_FREE(name, LQUOTA_NAME_MAX);
537         if (rc > 0)
538                 rc = 0;
539         RETURN(rc);
540 }
541
542 /*
543  * Retrieve quota settings from disk for a particular identifier.
544  *
545  * \param env - is the environment passed by the caller
546  * \param obj - is the on-disk index where quota settings are stored.
547  * \param id  - is the key to be updated
548  * \param rec - is the output record where to store quota settings.
549  *
550  * \retval    - 0 on success, appropriate error on failure
551  */
552 int lquota_disk_read(const struct lu_env *env, struct dt_object *obj,
553                      union lquota_id *id, struct dt_rec *rec)
554 {
555         int     rc;
556         ENTRY;
557
558         LASSERT(dt_object_exists(obj));
559         LASSERT(obj->do_index_ops != NULL);
560
561         /* lookup on-disk record from index file */
562         dt_read_lock(env, obj, 0);
563         rc = dt_lookup(env, obj, rec, (struct dt_key *)&id->qid_uid);
564         dt_read_unlock(env, obj);
565
566         RETURN(rc);
567 }
568
569 /*
570  * Reserve enough credits to update a record in a quota index file.
571  *
572  * \param env - is the environment passed by the caller
573  * \param th  - is the transaction to use for disk writes
574  * \param obj - is the on-disk index where quota settings are stored.
575  * \param id  - is the key to be updated
576  *
577  * \retval    - 0 on success, appropriate error on failure
578  */
579 int lquota_disk_declare_write(const struct lu_env *env, struct thandle *th,
580                               struct dt_object *obj, union lquota_id *id)
581 {
582         struct lquota_thread_info       *qti = lquota_info(env);
583         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
584         int                              rc;
585         ENTRY;
586
587         LASSERT(dt_object_exists(obj));
588         LASSERT(obj->do_index_ops != NULL);
589
590         /* speculative delete declaration in case there is already an existing
591          * record in the index */
592         rc = dt_declare_delete(env, obj, key, th);
593         if (rc)
594                 RETURN(rc);
595
596         /* declare insertion of updated record */
597         rc = dt_declare_insert(env, obj, (struct dt_rec *)&qti->qti_rec, key,
598                                th);
599         if (rc)
600                 RETURN(rc);
601
602         /* we might have to update the version of the global index too */
603         rc = dt_declare_version_set(env, obj, th);
604
605         RETURN(rc);
606 }
607
608 /*
609  * Update a record in a quota index file.
610  *
611  * \param env - is the environment passed by the caller
612  * \param th  - is the transaction to use for disk writes
613  * \param obj - is the on-disk index to be updated.
614  * \param id  - is the key to be updated
615  * \param rec - is the input record containing the new quota settings.
616  * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
617  * \param ver   - is the new version of the index if LQUOTA_SET_VER is set or is
618  *                used to return the new version of the index when
619  *                LQUOTA_BUMP_VER is set.
620  *
621  * \retval    - 0 on success, appropriate error on failure
622  */
623 int lquota_disk_write(const struct lu_env *env, struct thandle *th,
624                       struct dt_object *obj, union lquota_id *id,
625                       struct dt_rec *rec, __u32 flags, __u64 *ver)
626 {
627         struct lquota_thread_info       *qti = lquota_info(env);
628         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
629         int                              rc;
630         ENTRY;
631
632         LASSERT(dt_object_exists(obj));
633         LASSERT(obj->do_index_ops != NULL);
634
635         /* lock index */
636         dt_write_lock(env, obj, 0);
637
638         /* check whether there is already an existing record for this ID */
639         rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_rec, key);
640         if (rc == 0) {
641                 /* delete existing record in order to replace it */
642                 rc = dt_delete(env, obj, key, th);
643                 if (rc)
644                         GOTO(out, rc);
645         } else if (rc == -ENOENT) {
646                 /* probably first insert */
647                 rc = 0;
648         } else {
649                 GOTO(out, rc);
650         }
651
652         if (rec != NULL) {
653                 /* insert record with updated quota settings */
654                 rc = dt_insert(env, obj, rec, key, th);
655                 if (rc) {
656                         /* try to insert the old one */
657                         rc = dt_insert(env, obj, (struct dt_rec *)&qti->qti_rec,
658                                        key, th);
659                         LASSERTF(rc == 0, "failed to insert record in quota "
660                                  "index "DFID"\n",
661                                  PFID(lu_object_fid(&obj->do_lu)));
662                         GOTO(out, rc);
663                 }
664         }
665
666         if (flags != 0) {
667                 LASSERT(ver);
668                 if (flags & LQUOTA_BUMP_VER) {
669                         /* caller wants to bump the version, let's first read
670                          * it */
671                         *ver = dt_version_get(env, obj);
672                         (*ver)++;
673                 } else {
674                         LASSERT(flags & LQUOTA_SET_VER);
675                 }
676                 dt_version_set(env, obj, *ver, th);
677         }
678
679         EXIT;
680 out:
681         dt_write_unlock(env, obj);
682         return rc;
683 }
684
685 int lquota_disk_delete(const struct lu_env *env, struct thandle *th,
686                        struct dt_object *obj, __u64 qid, __u64 *ver)
687 {
688         struct lquota_thread_info       *qti = lquota_info(env);
689         struct dt_key                   *key = (struct dt_key *)&qid;
690         int                              rc;
691
692         ENTRY;
693
694         LASSERT(dt_object_exists(obj));
695         LASSERT(obj->do_index_ops != NULL);
696
697         /* lock index */
698         dt_write_lock(env, obj, 0);
699
700         /* check whether there is already an existing record for this ID */
701         rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_rec, key);
702         if (rc == 0) {
703                 rc = dt_delete(env, obj, key, th);
704                 if (rc == 0 && ver != NULL) {
705                         *ver = dt_version_get(env, obj);
706                         (*ver)++;
707                         dt_version_set(env, obj, *ver, th);
708                 }
709         }
710
711         dt_write_unlock(env, obj);
712         RETURN(rc);
713 }
714
715 /*
716  * Update version of an index file
717  *
718  * \param env - is the environment passed by the caller
719  * \param dev - is the backend dt device storing the index file
720  * \param obj - is the on-disk index that should be updated
721  * \param ver - is the new version
722  */
723 int lquota_disk_update_ver(const struct lu_env *env, struct dt_device *dev,
724                            struct dt_object *obj, __u64 ver)
725 {
726         struct thandle  *th;
727         int              rc;
728         ENTRY;
729
730         th = dt_trans_create(env, dev);
731         if (IS_ERR(th))
732                 RETURN(PTR_ERR(th));
733
734         rc = dt_declare_version_set(env, obj, th);
735         if (rc)
736                 GOTO(out, rc);
737
738         rc = dt_trans_start_local(env, dev, th);
739         if (rc)
740                 GOTO(out, rc);
741         th->th_sync = 1;
742
743         dt_version_set(env, obj, ver, th);
744         EXIT;
745 out:
746         dt_trans_stop(env, dev, th);
747         return rc;
748 }
749
750 /*
751  * Write a global record
752  *
753  * \param env - is the environment passed by the caller
754  * \param obj - is the on-disk global index to be updated
755  * \param id  - index to be updated
756  * \param rec - record to be written
757  */
758 int lquota_disk_write_glb(const struct lu_env *env, struct dt_object *obj,
759                           __u64 id, struct lquota_glb_rec *rec)
760 {
761         struct dt_device        *dev = lu2dt_dev(obj->do_lu.lo_dev);
762         struct thandle          *th;
763         struct dt_key           *key = (struct dt_key *)&id;
764         int                      rc;
765         ENTRY;
766
767         th = dt_trans_create(env, dev);
768         if (IS_ERR(th))
769                 RETURN(PTR_ERR(th));
770
771         /* the entry with 0 key can always be found in IAM file. */
772         if (id == 0) {
773                 rc = dt_declare_delete(env, obj, key, th);
774                 if (rc)
775                         GOTO(out, rc);
776         }
777
778         rc = dt_declare_insert(env, obj, (struct dt_rec *)rec, key, th);
779         if (rc)
780                 GOTO(out, rc);
781
782         rc = dt_trans_start_local(env, dev, th);
783         if (rc)
784                 GOTO(out, rc);
785
786         dt_write_lock(env, obj, 0);
787
788         if (id == 0) {
789                 struct lquota_glb_rec *tmp;
790
791                 OBD_ALLOC_PTR(tmp);
792                 if (tmp == NULL)
793                         GOTO(out_lock, rc = -ENOMEM);
794
795                 rc = dt_lookup(env, obj, (struct dt_rec *)tmp, key);
796
797                 OBD_FREE_PTR(tmp);
798                 if (rc == 0) {
799                         rc = dt_delete(env, obj, key, th);
800                         if (rc)
801                                 GOTO(out_lock, rc);
802                 }
803         }
804
805         rc = dt_insert(env, obj, (struct dt_rec *)rec, key, th);
806 out_lock:
807         dt_write_unlock(env, obj);
808 out:
809         dt_trans_stop(env, dev, th);
810         RETURN(rc);
811 }