Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / quota / lquota_disk.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * The disk API is used by both the QMT and QSD to access/update on-disk index
33  * files. The API consists of the following functions:
34  *
35  * - lquota_disk_dir_find_create: look-up quota directory, create it if not
36  *                                found.
37  * - lquota_disk_glb_find_create: look-up global index file, create it if not
38  *                                found.
39  * - lquota_disk_slv_find:        look-up a slave index file.
40  * - lquota_disk_slv_find_create: look-up a slave index file. Allocate a FID if
41  *                                required and create the index file on disk if
42  *                                it does not exist.
43  * - lquota_disk_for_each_slv:    iterate over all existing slave index files
44  * - lquota_disk_read:            read quota settings from an index file
45  * - lquota_disk_declare_write:   reserve credits to update a record in an index
46  *                                file
47  * - lquota_disk_write:           update a record in an index file
48  * - lquota_disk_update_ver:      update version of an index file
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include <obd_class.h>
54 #include "lquota_internal.h"
55
56 #define LQUOTA_MODE (S_IFREG | S_IRUGO | S_IWUSR)
57
58 /*
59  * Helper function looking up & creating if not found an index file with a
60  * dynamic fid.
61  */
62 static struct dt_object *
63 lquota_disk_find_create(const struct lu_env *env, struct dt_device *dev,
64                         struct dt_object *parent, struct lu_fid *fid,
65                         const struct dt_index_features *idx_feat,
66                         char *name)
67 {
68         struct lquota_thread_info *qti = lquota_info(env);
69         struct dt_object *obj;
70         struct local_oid_storage *los;
71         int rc;
72         ENTRY;
73
74         /* Set up local storage */
75         rc = local_oid_storage_init(env, dev, fid, &los);
76         if (rc)
77                 RETURN(ERR_PTR(rc));
78
79         /* lookup/create slave index file */
80         obj = local_index_find_or_create(env, los, parent, name, LQUOTA_MODE,
81                                          idx_feat);
82         if (IS_ERR(obj))
83                 GOTO(out, obj);
84
85         /* local_oid_storage_fini() will finalize the local storage device,
86          * we have to open the object in another device stack */
87         qti->qti_fid = obj->do_lu.lo_header->loh_fid;
88         dt_object_put_nocache(env, obj);
89         obj = dt_locate(env, dev, &qti->qti_fid);
90         if (IS_ERR(obj))
91                 GOTO(out, obj);
92 out:
93         local_oid_storage_fini(env, los);
94         RETURN(obj);
95 }
96
97 /*
98  * helper function to generate the filename associated with a slave index file
99  */
100 static inline int lquota_disk_slv_filename(const struct lu_fid *glb_fid,
101                                            struct obd_uuid *uuid,
102                                            char *filename)
103 {
104         char    *name, *uuid_str;
105
106         /* In most case, the uuid is NULL terminated */
107         if (uuid->uuid[sizeof(*uuid) - 1] != '\0') {
108                 OBD_ALLOC(uuid_str, sizeof(*uuid));
109                 if (uuid_str == NULL)
110                         RETURN(-ENOMEM);
111                 memcpy(uuid_str, uuid->uuid, sizeof(*uuid) - 1);
112         } else {
113                 uuid_str = (char *)uuid->uuid;
114         }
115
116         /* we strip the slave's UUID (in the form of fsname-OST0001_UUID) of
117          * the filesystem name in case this one is changed in the future */
118         name = strrchr(uuid_str, '-');
119         if (name == NULL) {
120                 name = strrchr(uuid_str, ':');
121                 if (name == NULL) {
122                         CERROR("Failed to extract extract filesystem "
123                                "name from UUID %s\n", uuid_str);
124                         if (uuid_str != uuid->uuid)
125                                 OBD_FREE(uuid_str, sizeof(*uuid));
126                         return -EINVAL;
127                 }
128         }
129         name++;
130
131         /* the filename is composed of the most signicant bits of the global
132          * FID, that's to say the oid which encodes the pool type and
133          * quota type, followed by the export UUID */
134         sprintf(filename, "0x%x-%s", glb_fid->f_oid, name);
135
136         if (uuid_str != uuid->uuid)
137                 OBD_FREE(uuid_str, sizeof(*uuid));
138
139         return 0;
140 }
141
142 /*
143  * Set up quota directory (either "quota_master" or "quota_slave") for a QMT or
144  * QSD instance. This function is also used to create per-pool directory on
145  * the quota master.
146  * The directory is created with a local sequence if it does not exist already.
147  * This function is called at ->ldo_prepare time when the full device stack is
148  * configured.
149  *
150  * \param env  - is the environment passed by the caller
151  * \param dev  - is the dt_device where to create the quota directory
152  * \param parent  - is the parent directory. If not specified, the directory
153  *                  will be created under the root directory
154  * \param name - is the name of quota directory to be created
155  *
156  * \retval     - pointer to quota root dt_object on success, appropriate error
157  *               on failure
158  */
159 struct dt_object *lquota_disk_dir_find_create(const struct lu_env *env,
160                                               struct dt_device *dev,
161                                               struct dt_object *parent,
162                                               const char *name)
163 {
164         struct lquota_thread_info       *qti = lquota_info(env);
165         struct dt_object                *qt_dir = NULL;
166         struct local_oid_storage        *los = NULL;
167         int                              rc;
168         ENTRY;
169
170         /* Set up local storage to create the quota directory.
171          * We use the sequence reserved for local named objects */
172         lu_local_name_obj_fid(&qti->qti_fid, 1);
173         rc = local_oid_storage_init(env, dev, &qti->qti_fid, &los);
174         if (rc)
175                 RETURN(ERR_PTR(rc));
176
177         if (parent == NULL) {
178                 /* Fetch dt object associated with root directory */
179                 rc = dt_root_get(env, dev, &qti->qti_fid);
180                 if (rc)
181                         GOTO(out, rc);
182
183                 parent = dt_locate_at(env, dev, &qti->qti_fid,
184                                       dev->dd_lu_dev.ld_site->ls_top_dev, NULL);
185                 if (IS_ERR(parent))
186                         GOTO(out, rc = PTR_ERR(parent));
187         } else {
188                 lu_object_get(&parent->do_lu);
189         }
190
191         /* create quota directory to be used for all quota index files */
192         qt_dir = local_file_find_or_create(env, los, parent, name, S_IFDIR |
193                                            S_IRUGO | S_IWUSR | S_IXUGO);
194         if (IS_ERR(qt_dir))
195                 GOTO(out, rc = PTR_ERR(qt_dir));
196
197         /* local_oid_storage_fini() will finalize the local storage device,
198          * we have to open the object in another device stack */
199         qti->qti_fid = qt_dir->do_lu.lo_header->loh_fid;
200         dt_object_put_nocache(env, qt_dir);
201         qt_dir = dt_locate(env, dev, &qti->qti_fid);
202         if (IS_ERR(qt_dir))
203                 GOTO(out, rc = PTR_ERR(qt_dir));
204
205         if (!dt_try_as_dir(env, qt_dir, true))
206                 GOTO(out, rc = -ENOTDIR);
207         EXIT;
208 out:
209         if (parent != NULL && !IS_ERR(parent))
210                 dt_object_put(env, parent);
211         if (los != NULL)
212                 local_oid_storage_fini(env, los);
213         if (rc) {
214                 if (qt_dir != NULL && !IS_ERR(qt_dir))
215                         dt_object_put(env, qt_dir);
216                 qt_dir = ERR_PTR(rc);
217         }
218         return qt_dir;
219 }
220
221 /*
222  * Look-up/create a global index file.
223  *
224  * \param env - is the environment passed by the caller
225  * \parap dev - is the dt_device where to lookup/create the global index file
226  * \param parent - is the parent directory where to create the global index if
227  *                 not found
228  * \param fid - is the fid of the global index to be looked up/created
229  * \parap local - indicates whether the index should be created with a local
230  *                generated fid or with \fid
231  *
232  * \retval     - pointer to the dt_object of the global index on success,
233  *               appropriate error on failure
234  */
235 struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env,
236                                               struct dt_device *dev,
237                                               struct dt_object *parent,
238                                               struct lu_fid *fid, bool local)
239 {
240         struct lquota_thread_info       *qti = lquota_info(env);
241         struct dt_object                *glb_idx;
242         const struct dt_index_features  *idx_feat;
243         ENTRY;
244
245         CDEBUG(D_QUOTA, "look-up/create %sglobal idx file ("DFID")\n",
246                local ? "local " : "", PFID(fid));
247
248         idx_feat = &dt_quota_glb_features;
249
250         /* the filename is composed of the most signicant bits of the FID,
251          * that's to say the oid which encodes the pool type and quota type */
252         sprintf(qti->qti_buf, "0x%x", fid->f_oid);
253
254         if (local) {
255                 /* We use the sequence reserved for local named objects */
256                 lu_local_name_obj_fid(&qti->qti_fid, 1);
257                 glb_idx = lquota_disk_find_create(env, dev, parent,
258                                                   &qti->qti_fid, idx_feat,
259                                                   qti->qti_buf);
260         } else {
261                 /* look-up/create global index on disk */
262                 glb_idx = local_index_find_or_create_with_fid(env, dev, fid,
263                                                               parent,
264                                                               qti->qti_buf,
265                                                               LQUOTA_MODE,
266                                                               idx_feat);
267         }
268
269         if (IS_ERR(glb_idx)) {
270                 CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld "
271                        "local:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
272                        PFID(fid), PTR_ERR(glb_idx), local);
273                 RETURN(glb_idx);
274         }
275
276         /* install index operation vector */
277         if (glb_idx->do_index_ops == NULL) {
278                 int rc;
279
280                 rc = glb_idx->do_ops->do_index_try(env, glb_idx, idx_feat);
281                 if (rc) {
282                         CERROR("%s: failed to setup index operations for "DFID
283                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
284                                PFID(lu_object_fid(&glb_idx->do_lu)), rc);
285                         dt_object_put(env, glb_idx);
286                         glb_idx = ERR_PTR(rc);
287                 }
288         }
289
290         RETURN(glb_idx);
291 }
292
293 /*
294  * Look-up a slave index file.
295  *
296  * \param env - is the environment passed by the caller
297  * \param dev - is the backend dt_device where to look-up/create the slave index
298  * \param parent - is the parent directory where to lookup the slave index
299  * \param glb_fid - is the fid of the global index file associated with this
300  *                  slave index.
301  * \param uuid    - is the uuid of slave which is (re)connecting to the master
302  *                  target
303  *
304  * \retval     - pointer to the dt_object of the slave index on success,
305  *               appropriate error on failure
306  */
307 struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
308                                        struct dt_device *dev,
309                                        struct dt_object *parent,
310                                        const struct lu_fid *glb_fid,
311                                        struct obd_uuid *uuid)
312 {
313         struct lquota_thread_info       *qti = lquota_info(env);
314         struct dt_object                *slv_idx;
315         int                              rc;
316         ENTRY;
317
318         LASSERT(uuid != NULL);
319
320         CDEBUG(D_QUOTA, "lookup slave index file for %s\n",
321                obd_uuid2str(uuid));
322
323         /* generate filename associated with the slave */
324         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
325         if (rc)
326                 RETURN(ERR_PTR(rc));
327
328         /* lookup slave index file */
329         rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
330         if (rc)
331                 RETURN(ERR_PTR(rc));
332
333         /* name is found, get the object */
334         slv_idx = dt_locate(env, dev, &qti->qti_fid);
335         if (IS_ERR(slv_idx))
336                 RETURN(slv_idx);
337
338         if (slv_idx->do_index_ops == NULL) {
339                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
340                                                    &dt_quota_slv_features);
341                 if (rc) {
342                         CERROR("%s: failed to setup slave index operations for "
343                                "%s, rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
344                                obd_uuid2str(uuid), rc);
345                         dt_object_put(env, slv_idx);
346                         slv_idx = ERR_PTR(rc);
347                 }
348         }
349
350         RETURN(slv_idx);
351 }
352
353 /*
354  * Look-up a slave index file. If the slave index isn't found:
355  * - if local is set to false, we allocate a FID from FID_SEQ_QUOTA sequence and
356  *   create the index.
357  * - otherwise, we create the index file with a local reserved FID (see
358  *   lquota_local_oid)
359  *
360  * \param env - is the environment passed by the caller
361  * \param dev - is the backend dt_device where to look-up/create the slave index
362  * \param parent - is the parent directory where to create the slave index if
363  *                 it does not exist already
364  * \param glb_fid - is the fid of the global index file associated with this
365  *                  slave index.
366  * \param uuid    - is the uuid of slave which is (re)connecting to the master
367  *                  target
368  * \param local   - indicate whether to use local reserved FID (LQUOTA_USR_OID
369  *                  & LQUOTA_GRP_OID) for the slave index creation or to
370  *                  allocate a new fid from sequence FID_SEQ_QUOTA
371  *
372  * \retval     - pointer to the dt_object of the slave index on success,
373  *               appropriate error on failure
374  */
375 struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
376                                               struct dt_device *dev,
377                                               struct dt_object *parent,
378                                               struct lu_fid *glb_fid,
379                                               struct obd_uuid *uuid,
380                                               bool local)
381 {
382         struct lquota_thread_info       *qti = lquota_info(env);
383         struct dt_object                *slv_idx;
384         int                              rc, type;
385         ENTRY;
386
387         LASSERT(uuid != NULL);
388
389         CDEBUG(D_QUOTA, "lookup/create slave index file for %s\n",
390                obd_uuid2str(uuid));
391
392         /* generate filename associated with the slave */
393         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
394         if (rc)
395                 RETURN(ERR_PTR(rc));
396
397         if (lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev))
398                 type = LDD_F_SV_TYPE_MDT;
399         else
400                 type = LDD_F_SV_TYPE_OST;
401
402         /* Slave indexes uses the FID_SEQ_QUOTA sequence since they can be read
403          * through the network */
404         qti->qti_fid.f_seq = FID_SEQ_QUOTA;
405         qti->qti_fid.f_ver = 0;
406         if (local) {
407                 int pool_type, qtype;
408
409                 rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
410                 if (rc)
411                         RETURN(ERR_PTR(rc));
412
413                 /* use predefined fid in the reserved oid list */
414                 if ((type == LDD_F_SV_TYPE_MDT && pool_type == LQUOTA_RES_MD) ||
415                     (type == LDD_F_SV_TYPE_OST && pool_type == LQUOTA_RES_DT))
416                         qti->qti_fid.f_oid = qtype2slv_oid(qtype);
417                 else
418                         qti->qti_fid.f_oid = pool_type << 16 |
419                                                         qtype2slv_oid(qtype);
420
421                 slv_idx = local_index_find_or_create_with_fid(env, dev,
422                                                               &qti->qti_fid,
423                                                               parent,
424                                                               qti->qti_buf,
425                                                               LQUOTA_MODE,
426                                                         &dt_quota_slv_features);
427         } else {
428                 /* allocate fid dynamically if index does not exist already */
429                 qti->qti_fid.f_oid = LQUOTA_GENERATED_OID;
430
431                 /* lookup/create slave index file */
432                 slv_idx = lquota_disk_find_create(env, dev, parent,
433                                                   &qti->qti_fid,
434                                                   &dt_quota_slv_features,
435                                                   qti->qti_buf);
436         }
437
438         if (IS_ERR(slv_idx))
439                 RETURN(slv_idx);
440
441         /* install index operation vector */
442         if (slv_idx->do_index_ops == NULL) {
443                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
444                                                    &dt_quota_slv_features);
445                 if (rc) {
446                         CERROR("%s: failed to setup index operations for "DFID
447                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
448                                PFID(lu_object_fid(&slv_idx->do_lu)), rc);
449                         dt_object_put(env, slv_idx);
450                         slv_idx = ERR_PTR(rc);
451                 }
452         }
453
454         RETURN(slv_idx);
455 }
456
457 /*
458  * Iterate over all slave index files associated with global index \glb_fid and
459  * invoke a callback function for each slave index file.
460  *
461  * \param env     - is the environment passed by the caller
462  * \param parent  - is the parent directory where the slave index files are
463  *                  stored
464  * \param glb_fid - is the fid of the global index file associated with the
465  *                  slave indexes to scan
466  * \param func    - is the callback function to call each time a slave index
467  *                  file is found
468  * \param arg     - is an opaq argument passed to the callback function \func
469  */
470 int lquota_disk_for_each_slv(const struct lu_env *env, struct dt_object *parent,
471                              struct lu_fid *glb_fid, lquota_disk_slv_cb_t func,
472                              void *arg)
473 {
474         struct lquota_thread_info       *qti = lquota_info(env);
475         struct dt_it                    *it;
476         const struct dt_it_ops          *iops;
477         char                            *name;
478         int                              rc;
479         ENTRY;
480
481         OBD_ALLOC(name, LQUOTA_NAME_MAX);
482         if (name == NULL)
483                 RETURN(-ENOMEM);
484
485         /* filename associated with slave index files are prefixed with the most
486          * signicant bits of the global FID */
487         sprintf(name, "0x%x-", glb_fid->f_oid);
488
489         iops = &parent->do_index_ops->dio_it;
490         it = iops->init(env, parent, 0);
491         if (IS_ERR(it)) {
492                 OBD_FREE(name, LQUOTA_NAME_MAX);
493                 RETURN(PTR_ERR(it));
494         }
495
496         rc = iops->load(env, it, 0);
497         if (rc == 0) {
498                 /*
499                  * Iterator didn't find record with exactly the key requested.
500                  *
501                  * It is currently either
502                  *
503                  *     - positioned above record with key less than
504                  *     requested---skip it.
505                  *
506                  *     - or not positioned at all (is in IAM_IT_SKEWED
507                  *     state)---position it on the next item.
508                  */
509                 rc = iops->next(env, it);
510         } else if (rc > 0)
511                 rc = 0;
512
513         while (rc == 0) {
514                 struct dt_key   *key;
515                 int              len;
516
517                 len = iops->key_size(env, it);
518                 /* IAM iterator can return record with zero len. */
519                 if (len == 0 || len <= strlen(name) || len >= LQUOTA_NAME_MAX)
520                         goto next;
521
522                 key = iops->key(env, it);
523                 if (IS_ERR(key)) {
524                         rc = PTR_ERR(key);
525                         break;
526                 }
527
528                 if (strncmp((char *)key, name, strlen(name)) != 0)
529                         goto next;
530
531                 /* ldiskfs OSD returns filename as stored in directory entry
532                  * which does not end up with '\0' */
533                 memcpy(&qti->qti_buf, key, len);
534                 qti->qti_buf[len] = '\0';
535
536                 /* lookup fid associated with this slave index file */
537                 rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
538                 if (rc)
539                         break;
540
541                 if (qti->qti_fid.f_seq != FID_SEQ_QUOTA)
542                         goto next;
543
544                 rc = func(env, glb_fid, qti->qti_buf, &qti->qti_fid, arg);
545                 if (rc)
546                         break;
547 next:
548                 do {
549                         rc = iops->next(env, it);
550                 } while (rc == -ESTALE);
551         }
552
553         iops->put(env, it);
554         iops->fini(env, it);
555         OBD_FREE(name, LQUOTA_NAME_MAX);
556         if (rc > 0)
557                 rc = 0;
558         RETURN(rc);
559 }
560
561 /*
562  * Retrieve quota settings from disk for a particular identifier.
563  *
564  * \param env - is the environment passed by the caller
565  * \param obj - is the on-disk index where quota settings are stored.
566  * \param id  - is the key to be updated
567  * \param rec - is the output record where to store quota settings.
568  *
569  * \retval    - 0 on success, appropriate error on failure
570  */
571 int lquota_disk_read(const struct lu_env *env, struct dt_object *obj,
572                      union lquota_id *id, struct dt_rec *rec)
573 {
574         int     rc;
575         ENTRY;
576
577         LASSERT(dt_object_exists(obj));
578         LASSERT(obj->do_index_ops != NULL);
579
580         /* lookup on-disk record from index file */
581         dt_read_lock(env, obj, 0);
582         rc = dt_lookup(env, obj, rec, (struct dt_key *)&id->qid_uid);
583         dt_read_unlock(env, obj);
584
585         RETURN(rc);
586 }
587
588 /*
589  * Reserve enough credits to update a record in a quota index file.
590  *
591  * \param env - is the environment passed by the caller
592  * \param th  - is the transaction to use for disk writes
593  * \param obj - is the on-disk index where quota settings are stored.
594  * \param id  - is the key to be updated
595  *
596  * \retval    - 0 on success, appropriate error on failure
597  */
598 int lquota_disk_declare_write(const struct lu_env *env, struct thandle *th,
599                               struct dt_object *obj, union lquota_id *id)
600 {
601         struct lquota_thread_info       *qti = lquota_info(env);
602         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
603         int                              rc;
604         ENTRY;
605
606         LASSERT(dt_object_exists(obj));
607         LASSERT(obj->do_index_ops != NULL);
608
609         /* speculative delete declaration in case there is already an existing
610          * record in the index */
611         rc = dt_declare_delete(env, obj, key, th);
612         if (rc)
613                 RETURN(rc);
614
615         /* declare insertion of updated record */
616         rc = dt_declare_insert(env, obj, (struct dt_rec *)&qti->qti_rec, key,
617                                th);
618         if (rc)
619                 RETURN(rc);
620
621         /* we might have to update the version of the global index too */
622         rc = dt_declare_version_set(env, obj, th);
623
624         RETURN(rc);
625 }
626
627 /*
628  * Update a record in a quota index file.
629  *
630  * \param env - is the environment passed by the caller
631  * \param th  - is the transaction to use for disk writes
632  * \param obj - is the on-disk index to be updated.
633  * \param id  - is the key to be updated
634  * \param rec - is the input record containing the new quota settings.
635  * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
636  * \param ver   - is the new version of the index if LQUOTA_SET_VER is set or is
637  *                used to return the new version of the index when
638  *                LQUOTA_BUMP_VER is set.
639  *
640  * \retval    - 0 on success, appropriate error on failure
641  */
642 int lquota_disk_write(const struct lu_env *env, struct thandle *th,
643                       struct dt_object *obj, union lquota_id *id,
644                       struct dt_rec *rec, __u32 flags, __u64 *ver)
645 {
646         struct lquota_thread_info       *qti = lquota_info(env);
647         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
648         int                              rc;
649         ENTRY;
650
651         LASSERT(dt_object_exists(obj));
652         LASSERT(obj->do_index_ops != NULL);
653
654         /* lock index */
655         dt_write_lock(env, obj, 0);
656
657         /* check whether there is already an existing record for this ID */
658         rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_rec, key);
659         if (rc == 0) {
660                 /* delete existing record in order to replace it */
661                 rc = dt_delete(env, obj, key, th);
662                 if (rc)
663                         GOTO(out, rc);
664         } else if (rc == -ENOENT) {
665                 /* probably first insert */
666                 rc = 0;
667         } else {
668                 GOTO(out, rc);
669         }
670
671         if (rec != NULL) {
672                 /* insert record with updated quota settings */
673                 rc = dt_insert(env, obj, rec, key, th);
674                 if (rc) {
675                         /* try to insert the old one */
676                         rc = dt_insert(env, obj, (struct dt_rec *)&qti->qti_rec,
677                                        key, th);
678                         LASSERTF(rc == 0, "failed to insert record in quota "
679                                  "index "DFID"\n",
680                                  PFID(lu_object_fid(&obj->do_lu)));
681                         GOTO(out, rc);
682                 }
683         }
684
685         if (flags != 0) {
686                 LASSERT(ver);
687                 if (flags & LQUOTA_BUMP_VER) {
688                         /* caller wants to bump the version, let's first read
689                          * it */
690                         *ver = dt_version_get(env, obj);
691                         (*ver)++;
692                 } else {
693                         LASSERT(flags & LQUOTA_SET_VER);
694                 }
695                 dt_version_set(env, obj, *ver, th);
696         }
697
698         EXIT;
699 out:
700         dt_write_unlock(env, obj);
701         return rc;
702 }
703
704 int lquota_disk_delete(const struct lu_env *env, struct thandle *th,
705                        struct dt_object *obj, __u64 qid, __u64 *ver)
706 {
707         struct lquota_thread_info       *qti = lquota_info(env);
708         struct dt_key                   *key = (struct dt_key *)&qid;
709         int                              rc;
710
711         ENTRY;
712
713         LASSERT(dt_object_exists(obj));
714         LASSERT(obj->do_index_ops != NULL);
715
716         /* lock index */
717         dt_write_lock(env, obj, 0);
718
719         /* check whether there is already an existing record for this ID */
720         rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_rec, key);
721         if (rc == 0) {
722                 rc = dt_delete(env, obj, key, th);
723                 if (rc == 0 && ver != NULL) {
724                         *ver = dt_version_get(env, obj);
725                         (*ver)++;
726                         dt_version_set(env, obj, *ver, th);
727                 }
728         }
729
730         dt_write_unlock(env, obj);
731         RETURN(rc);
732 }
733
734 /*
735  * Update version of an index file
736  *
737  * \param env - is the environment passed by the caller
738  * \param dev - is the backend dt device storing the index file
739  * \param obj - is the on-disk index that should be updated
740  * \param ver - is the new version
741  */
742 int lquota_disk_update_ver(const struct lu_env *env, struct dt_device *dev,
743                            struct dt_object *obj, __u64 ver)
744 {
745         struct thandle  *th;
746         int              rc;
747         ENTRY;
748
749         th = dt_trans_create(env, dev);
750         if (IS_ERR(th))
751                 RETURN(PTR_ERR(th));
752
753         rc = dt_declare_version_set(env, obj, th);
754         if (rc)
755                 GOTO(out, rc);
756
757         rc = dt_trans_start_local(env, dev, th);
758         if (rc)
759                 GOTO(out, rc);
760         th->th_sync = 1;
761
762         dt_version_set(env, obj, ver, th);
763         EXIT;
764 out:
765         dt_trans_stop(env, dev, th);
766         return rc;
767 }
768
769 /*
770  * Write a global record
771  *
772  * \param env - is the environment passed by the caller
773  * \param obj - is the on-disk global index to be updated
774  * \param id  - index to be updated
775  * \param rec - record to be written
776  */
777 int lquota_disk_write_glb(const struct lu_env *env, struct dt_object *obj,
778                           __u64 id, struct lquota_glb_rec *rec)
779 {
780         struct dt_device        *dev = lu2dt_dev(obj->do_lu.lo_dev);
781         struct thandle          *th;
782         struct dt_key           *key = (struct dt_key *)&id;
783         int                      rc;
784         ENTRY;
785
786         th = dt_trans_create(env, dev);
787         if (IS_ERR(th))
788                 RETURN(PTR_ERR(th));
789
790         /* the entry with 0 key can always be found in IAM file. */
791         if (id == 0) {
792                 rc = dt_declare_delete(env, obj, key, th);
793                 if (rc)
794                         GOTO(out, rc);
795         }
796
797         rc = dt_declare_insert(env, obj, (struct dt_rec *)rec, key, th);
798         if (rc)
799                 GOTO(out, rc);
800
801         rc = dt_trans_start_local(env, dev, th);
802         if (rc)
803                 GOTO(out, rc);
804
805         dt_write_lock(env, obj, 0);
806
807         if (id == 0) {
808                 struct lquota_glb_rec *tmp;
809
810                 OBD_ALLOC_PTR(tmp);
811                 if (tmp == NULL)
812                         GOTO(out_lock, rc = -ENOMEM);
813
814                 rc = dt_lookup(env, obj, (struct dt_rec *)tmp, key);
815
816                 OBD_FREE_PTR(tmp);
817                 if (rc == 0) {
818                         rc = dt_delete(env, obj, key, th);
819                         if (rc)
820                                 GOTO(out_lock, rc);
821                 }
822         }
823
824         rc = dt_insert(env, obj, (struct dt_rec *)rec, key, th);
825 out_lock:
826         dt_write_unlock(env, obj);
827 out:
828         dt_trans_stop(env, dev, th);
829         RETURN(rc);
830 }