Whamcloud - gitweb
c823873fb625462f74db6d795e49830467694f52
[fs/lustre-release.git] / lustre / quota / lquota_disk.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * The disk API is used by both the QMT and QSD to access/update on-disk index
33  * files. The API consists of the following functions:
34  *
35  * - lquota_disk_dir_find_create: look-up quota directory, create it if not
36  *                                found.
37  * - lquota_disk_glb_find_create: look-up global index file, create it if not
38  *                                found.
39  * - lquota_disk_slv_find:        look-up a slave index file.
40  * - lquota_disk_slv_find_create: look-up a slave index file. Allocate a FID if
41  *                                required and create the index file on disk if
42  *                                it does not exist.
43  * - lquota_disk_for_each_slv:    iterate over all existing slave index files
44  * - lquota_disk_read:            read quota settings from an index file
45  * - lquota_disk_declare_write:   reserve credits to update a record in an index
46  *                                file
47  * - lquota_disk_write:           update a record in an index file
48  * - lquota_disk_update_ver:      update version of an index file
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include "lquota_internal.h"
54
55 #define LQUOTA_MODE (S_IFREG | S_IRUGO | S_IWUSR)
56
57 /*
58  * Helper function looking up & creating if not found an index file with a
59  * dynamic fid.
60  */
61 static struct dt_object *
62 lquota_disk_find_create(const struct lu_env *env, struct dt_device *dev,
63                         struct dt_object *parent, struct lu_fid *fid,
64                         const struct dt_index_features *idx_feat,
65                         char *name)
66 {
67         struct lquota_thread_info *qti = lquota_info(env);
68         struct dt_object *obj;
69         struct local_oid_storage *los;
70         int rc;
71         ENTRY;
72
73         /* Set up local storage */
74         rc = local_oid_storage_init(env, dev, fid, &los);
75         if (rc)
76                 RETURN(ERR_PTR(rc));
77
78         /* lookup/create slave index file */
79         obj = local_index_find_or_create(env, los, parent, name, LQUOTA_MODE,
80                                          idx_feat);
81         if (IS_ERR(obj))
82                 GOTO(out, obj);
83
84         /* local_oid_storage_fini() will finalize the local storage device,
85          * we have to open the object in another device stack */
86         qti->qti_fid = obj->do_lu.lo_header->loh_fid;
87         dt_object_put_nocache(env, obj);
88         obj = dt_locate(env, dev, &qti->qti_fid);
89         if (IS_ERR(obj))
90                 GOTO(out, obj);
91 out:
92         local_oid_storage_fini(env, los);
93         RETURN(obj);
94 }
95
96 /*
97  * helper function to generate the filename associated with a slave index file
98  */
99 static inline int lquota_disk_slv_filename(const struct lu_fid *glb_fid,
100                                            struct obd_uuid *uuid,
101                                            char *filename)
102 {
103         char    *name, *uuid_str;
104
105         /* In most case, the uuid is NULL terminated */
106         if (uuid->uuid[sizeof(*uuid) - 1] != '\0') {
107                 OBD_ALLOC(uuid_str, sizeof(*uuid));
108                 if (uuid_str == NULL)
109                         RETURN(-ENOMEM);
110                 memcpy(uuid_str, uuid->uuid, sizeof(*uuid) - 1);
111         } else {
112                 uuid_str = (char *)uuid->uuid;
113         }
114
115         /* we strip the slave's UUID (in the form of fsname-OST0001_UUID) of
116          * the filesystem name in case this one is changed in the future */
117         name = strrchr(uuid_str, '-');
118         if (name == NULL) {
119                 name = strrchr(uuid_str, ':');
120                 if (name == NULL) {
121                         CERROR("Failed to extract extract filesystem "
122                                "name from UUID %s\n", uuid_str);
123                         if (uuid_str != uuid->uuid)
124                                 OBD_FREE(uuid_str, sizeof(*uuid));
125                         return -EINVAL;
126                 }
127         }
128         name++;
129
130         /* the filename is composed of the most signicant bits of the global
131          * FID, that's to say the oid which encodes the pool id, pool type and
132          * quota type, followed by the export UUID */
133         sprintf(filename, "0x%x-%s", glb_fid->f_oid, name);
134
135         if (uuid_str != uuid->uuid)
136                 OBD_FREE(uuid_str, sizeof(*uuid));
137
138         return 0;
139 }
140
141 /*
142  * Set up quota directory (either "quota_master" or "quota_slave") for a QMT or
143  * QSD instance. This function is also used to create per-pool directory on
144  * the quota master.
145  * The directory is created with a local sequence if it does not exist already.
146  * This function is called at ->ldo_prepare time when the full device stack is
147  * configured.
148  *
149  * \param env  - is the environment passed by the caller
150  * \param dev  - is the dt_device where to create the quota directory
151  * \param parent  - is the parent directory. If not specified, the directory
152  *                  will be created under the root directory
153  * \param name - is the name of quota directory to be created
154  *
155  * \retval     - pointer to quota root dt_object on success, appropriate error
156  *               on failure
157  */
158 struct dt_object *lquota_disk_dir_find_create(const struct lu_env *env,
159                                               struct dt_device *dev,
160                                               struct dt_object *parent,
161                                               const char *name)
162 {
163         struct lquota_thread_info       *qti = lquota_info(env);
164         struct dt_object                *qt_dir = NULL;
165         struct local_oid_storage        *los = NULL;
166         int                              rc;
167         ENTRY;
168
169         /* Set up local storage to create the quota directory.
170          * We use the sequence reserved for local named objects */
171         lu_local_name_obj_fid(&qti->qti_fid, 1);
172         rc = local_oid_storage_init(env, dev, &qti->qti_fid, &los);
173         if (rc)
174                 RETURN(ERR_PTR(rc));
175
176         if (parent == NULL) {
177                 /* Fetch dt object associated with root directory */
178                 rc = dt_root_get(env, dev, &qti->qti_fid);
179                 if (rc)
180                         GOTO(out, rc);
181
182                 parent = dt_locate_at(env, dev, &qti->qti_fid,
183                                       dev->dd_lu_dev.ld_site->ls_top_dev, NULL);
184                 if (IS_ERR(parent))
185                         GOTO(out, rc = PTR_ERR(parent));
186         } else {
187                 lu_object_get(&parent->do_lu);
188         }
189
190         /* create quota directory to be used for all quota index files */
191         qt_dir = local_file_find_or_create(env, los, parent, name, S_IFDIR |
192                                            S_IRUGO | S_IWUSR | S_IXUGO);
193         if (IS_ERR(qt_dir))
194                 GOTO(out, rc = PTR_ERR(qt_dir));
195
196         /* local_oid_storage_fini() will finalize the local storage device,
197          * we have to open the object in another device stack */
198         qti->qti_fid = qt_dir->do_lu.lo_header->loh_fid;
199         dt_object_put_nocache(env, qt_dir);
200         qt_dir = dt_locate(env, dev, &qti->qti_fid);
201         if (IS_ERR(qt_dir))
202                 GOTO(out, rc = PTR_ERR(qt_dir));
203
204         if (!dt_try_as_dir(env, qt_dir))
205                 GOTO(out, rc = -ENOTDIR);
206         EXIT;
207 out:
208         if (parent != NULL && !IS_ERR(parent))
209                 dt_object_put(env, parent);
210         if (los != NULL)
211                 local_oid_storage_fini(env, los);
212         if (rc) {
213                 if (qt_dir != NULL && !IS_ERR(qt_dir))
214                         dt_object_put(env, qt_dir);
215                 qt_dir = ERR_PTR(rc);
216         }
217         return qt_dir;
218 }
219
220 /*
221  * Look-up/create a global index file.
222  *
223  * \param env - is the environment passed by the caller
224  * \parap dev - is the dt_device where to lookup/create the global index file
225  * \param parent - is the parent directory where to create the global index if
226  *                 not found
227  * \param fid - is the fid of the global index to be looked up/created
228  * \parap local - indicates whether the index should be created with a local
229  *                generated fid or with \fid
230  *
231  * \retval     - pointer to the dt_object of the global index on success,
232  *               appropriate error on failure
233  */
234 struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env,
235                                               struct dt_device *dev,
236                                               struct dt_object *parent,
237                                               struct lu_fid *fid, bool local)
238 {
239         struct lquota_thread_info       *qti = lquota_info(env);
240         struct dt_object                *glb_idx;
241         const struct dt_index_features  *idx_feat;
242         ENTRY;
243
244         CDEBUG(D_QUOTA, "look-up/create %sglobal idx file ("DFID")\n",
245                local ? "local " : "", PFID(fid));
246
247         idx_feat = &dt_quota_glb_features;
248
249         /* the filename is composed of the most signicant bits of the FID,
250          * that's to say the oid which encodes the pool id, pool type and quota
251          * type */
252         sprintf(qti->qti_buf, "0x%x", fid->f_oid);
253
254         if (local) {
255                 /* We use the sequence reserved for local named objects */
256                 lu_local_name_obj_fid(&qti->qti_fid, 1);
257                 glb_idx = lquota_disk_find_create(env, dev, parent,
258                                                   &qti->qti_fid, idx_feat,
259                                                   qti->qti_buf);
260         } else {
261                 /* look-up/create global index on disk */
262                 glb_idx = local_index_find_or_create_with_fid(env, dev, fid,
263                                                               parent,
264                                                               qti->qti_buf,
265                                                               LQUOTA_MODE,
266                                                               idx_feat);
267         }
268
269         if (IS_ERR(glb_idx)) {
270                 CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld "
271                        "local:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
272                        PFID(fid), PTR_ERR(glb_idx), local);
273                 RETURN(glb_idx);
274         }
275
276         /* install index operation vector */
277         if (glb_idx->do_index_ops == NULL) {
278                 int rc;
279
280                 rc = glb_idx->do_ops->do_index_try(env, glb_idx, idx_feat);
281                 if (rc) {
282                         CERROR("%s: failed to setup index operations for "DFID
283                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
284                                PFID(lu_object_fid(&glb_idx->do_lu)), rc);
285                         dt_object_put(env, glb_idx);
286                         glb_idx = ERR_PTR(rc);
287                 }
288         }
289
290         RETURN(glb_idx);
291 }
292
293 /*
294  * Look-up a slave index file.
295  *
296  * \param env - is the environment passed by the caller
297  * \param dev - is the backend dt_device where to look-up/create the slave index
298  * \param parent - is the parent directory where to lookup the slave index
299  * \param glb_fid - is the fid of the global index file associated with this
300  *                  slave index.
301  * \param uuid    - is the uuid of slave which is (re)connecting to the master
302  *                  target
303  *
304  * \retval     - pointer to the dt_object of the slave index on success,
305  *               appropriate error on failure
306  */
307 struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
308                                        struct dt_device *dev,
309                                        struct dt_object *parent,
310                                        const struct lu_fid *glb_fid,
311                                        struct obd_uuid *uuid)
312 {
313         struct lquota_thread_info       *qti = lquota_info(env);
314         struct dt_object                *slv_idx;
315         int                              rc;
316         ENTRY;
317
318         LASSERT(uuid != NULL);
319
320         CDEBUG(D_QUOTA, "lookup slave index file for %s\n",
321                obd_uuid2str(uuid));
322
323         /* generate filename associated with the slave */
324         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
325         if (rc)
326                 RETURN(ERR_PTR(rc));
327
328         /* lookup slave index file */
329         rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
330         if (rc)
331                 RETURN(ERR_PTR(rc));
332
333         /* name is found, get the object */
334         slv_idx = dt_locate(env, dev, &qti->qti_fid);
335         if (IS_ERR(slv_idx))
336                 RETURN(slv_idx);
337
338         if (slv_idx->do_index_ops == NULL) {
339                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
340                                                    &dt_quota_slv_features);
341                 if (rc) {
342                         CERROR("%s: failed to setup slave index operations for "
343                                "%s, rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
344                                obd_uuid2str(uuid), rc);
345                         dt_object_put(env, slv_idx);
346                         slv_idx = ERR_PTR(rc);
347                 }
348         }
349
350         RETURN(slv_idx);
351 }
352
353 /*
354  * Look-up a slave index file. If the slave index isn't found:
355  * - if local is set to false, we allocate a FID from FID_SEQ_QUOTA sequence and
356  *   create the index.
357  * - otherwise, we create the index file with a local reserved FID (see
358  *   lquota_local_oid)
359  *
360  * \param env - is the environment passed by the caller
361  * \param dev - is the backend dt_device where to look-up/create the slave index
362  * \param parent - is the parent directory where to create the slave index if
363  *                 it does not exist already
364  * \param glb_fid - is the fid of the global index file associated with this
365  *                  slave index.
366  * \param uuid    - is the uuid of slave which is (re)connecting to the master
367  *                  target
368  * \param local   - indicate whether to use local reserved FID (LQUOTA_USR_OID
369  *                  & LQUOTA_GRP_OID) for the slave index creation or to
370  *                  allocate a new fid from sequence FID_SEQ_QUOTA
371  *
372  * \retval     - pointer to the dt_object of the slave index on success,
373  *               appropriate error on failure
374  */
375 struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
376                                               struct dt_device *dev,
377                                               struct dt_object *parent,
378                                               struct lu_fid *glb_fid,
379                                               struct obd_uuid *uuid,
380                                               bool local)
381 {
382         struct lquota_thread_info       *qti = lquota_info(env);
383         struct dt_object                *slv_idx;
384         int                              rc;
385         ENTRY;
386
387         LASSERT(uuid != NULL);
388
389         CDEBUG(D_QUOTA, "lookup/create slave index file for %s\n",
390                obd_uuid2str(uuid));
391
392         /* generate filename associated with the slave */
393         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
394         if (rc)
395                 RETURN(ERR_PTR(rc));
396
397         /* Slave indexes uses the FID_SEQ_QUOTA sequence since they can be read
398          * through the network */
399         qti->qti_fid.f_seq = FID_SEQ_QUOTA;
400         qti->qti_fid.f_ver = 0;
401         if (local) {
402                 int type;
403
404                 rc = lquota_extract_fid(glb_fid, NULL, NULL, &type);
405                 if (rc)
406                         RETURN(ERR_PTR(rc));
407
408                 /* use predefined fid in the reserved oid list */
409                 qti->qti_fid.f_oid = qtype2slv_oid(type);
410
411                 slv_idx = local_index_find_or_create_with_fid(env, dev,
412                                                               &qti->qti_fid,
413                                                               parent,
414                                                               qti->qti_buf,
415                                                               LQUOTA_MODE,
416                                                         &dt_quota_slv_features);
417         } else {
418                 /* allocate fid dynamically if index does not exist already */
419                 qti->qti_fid.f_oid = LQUOTA_GENERATED_OID;
420
421                 /* lookup/create slave index file */
422                 slv_idx = lquota_disk_find_create(env, dev, parent,
423                                                   &qti->qti_fid,
424                                                   &dt_quota_slv_features,
425                                                   qti->qti_buf);
426         }
427
428         if (IS_ERR(slv_idx))
429                 RETURN(slv_idx);
430
431         /* install index operation vector */
432         if (slv_idx->do_index_ops == NULL) {
433                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
434                                                    &dt_quota_slv_features);
435                 if (rc) {
436                         CERROR("%s: failed to setup index operations for "DFID
437                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
438                                PFID(lu_object_fid(&slv_idx->do_lu)), rc);
439                         dt_object_put(env, slv_idx);
440                         slv_idx = ERR_PTR(rc);
441                 }
442         }
443
444         RETURN(slv_idx);
445 }
446
447 /*
448  * Iterate over all slave index files associated with global index \glb_fid and
449  * invoke a callback function for each slave index file.
450  *
451  * \param env     - is the environment passed by the caller
452  * \param parent  - is the parent directory where the slave index files are
453  *                  stored
454  * \param glb_fid - is the fid of the global index file associated with the
455  *                  slave indexes to scan
456  * \param func    - is the callback function to call each time a slave index
457  *                  file is found
458  * \param arg     - is an opaq argument passed to the callback function \func
459  */
460 int lquota_disk_for_each_slv(const struct lu_env *env, struct dt_object *parent,
461                              struct lu_fid *glb_fid, lquota_disk_slv_cb_t func,
462                              void *arg)
463 {
464         struct lquota_thread_info       *qti = lquota_info(env);
465         struct dt_it                    *it;
466         const struct dt_it_ops          *iops;
467         char                            *name;
468         int                              rc;
469         ENTRY;
470
471         OBD_ALLOC(name, LQUOTA_NAME_MAX);
472         if (name == NULL)
473                 RETURN(-ENOMEM);
474
475         /* filename associated with slave index files are prefixed with the most
476          * signicant bits of the global FID */
477         sprintf(name, "0x%x-", glb_fid->f_oid);
478
479         iops = &parent->do_index_ops->dio_it;
480         it = iops->init(env, parent, 0);
481         if (IS_ERR(it)) {
482                 OBD_FREE(name, LQUOTA_NAME_MAX);
483                 RETURN(PTR_ERR(it));
484         }
485
486         rc = iops->load(env, it, 0);
487         if (rc == 0) {
488                 /*
489                  * Iterator didn't find record with exactly the key requested.
490                  *
491                  * It is currently either
492                  *
493                  *     - positioned above record with key less than
494                  *     requested---skip it.
495                  *
496                  *     - or not positioned at all (is in IAM_IT_SKEWED
497                  *     state)---position it on the next item.
498                  */
499                 rc = iops->next(env, it);
500         } else if (rc > 0)
501                 rc = 0;
502
503         while (rc == 0) {
504                 struct dt_key   *key;
505                 int              len;
506
507                 len = iops->key_size(env, it);
508                 /* IAM iterator can return record with zero len. */
509                 if (len == 0 || len <= strlen(name) || len >= LQUOTA_NAME_MAX)
510                         goto next;
511
512                 key = iops->key(env, it);
513                 if (IS_ERR(key)) {
514                         rc = PTR_ERR(key);
515                         break;
516                 }
517
518                 if (strncmp((char *)key, name, strlen(name)) != 0)
519                         goto next;
520
521                 /* ldiskfs OSD returns filename as stored in directory entry
522                  * which does not end up with '\0' */
523                 memcpy(&qti->qti_buf, key, len);
524                 qti->qti_buf[len] = '\0';
525
526                 /* lookup fid associated with this slave index file */
527                 rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
528                 if (rc)
529                         break;
530
531                 if (qti->qti_fid.f_seq != FID_SEQ_QUOTA)
532                         goto next;
533
534                 rc = func(env, glb_fid, (char *)key, &qti->qti_fid, arg);
535                 if (rc)
536                         break;
537 next:
538                 do {
539                         rc = iops->next(env, it);
540                 } while (rc == -ESTALE);
541         }
542
543         iops->put(env, it);
544         iops->fini(env, it);
545         OBD_FREE(name, LQUOTA_NAME_MAX);
546         if (rc > 0)
547                 rc = 0;
548         RETURN(rc);
549 }
550
551 /*
552  * Retrieve quota settings from disk for a particular identifier.
553  *
554  * \param env - is the environment passed by the caller
555  * \param obj - is the on-disk index where quota settings are stored.
556  * \param id  - is the key to be updated
557  * \param rec - is the output record where to store quota settings.
558  *
559  * \retval    - 0 on success, appropriate error on failure
560  */
561 int lquota_disk_read(const struct lu_env *env, struct dt_object *obj,
562                      union lquota_id *id, struct dt_rec *rec)
563 {
564         int     rc;
565         ENTRY;
566
567         LASSERT(dt_object_exists(obj));
568         LASSERT(obj->do_index_ops != NULL);
569
570         /* lookup on-disk record from index file */
571         dt_read_lock(env, obj, 0);
572         rc = dt_lookup(env, obj, rec, (struct dt_key *)&id->qid_uid);
573         dt_read_unlock(env, obj);
574
575         RETURN(rc);
576 }
577
578 /*
579  * Reserve enough credits to update a record in a quota index file.
580  *
581  * \param env - is the environment passed by the caller
582  * \param th  - is the transaction to use for disk writes
583  * \param obj - is the on-disk index where quota settings are stored.
584  * \param id  - is the key to be updated
585  *
586  * \retval    - 0 on success, appropriate error on failure
587  */
588 int lquota_disk_declare_write(const struct lu_env *env, struct thandle *th,
589                               struct dt_object *obj, union lquota_id *id)
590 {
591         struct lquota_thread_info       *qti = lquota_info(env);
592         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
593         int                              rc;
594         ENTRY;
595
596         LASSERT(dt_object_exists(obj));
597         LASSERT(obj->do_index_ops != NULL);
598
599         /* speculative delete declaration in case there is already an existing
600          * record in the index */
601         rc = dt_declare_delete(env, obj, key, th);
602         if (rc)
603                 RETURN(rc);
604
605         /* declare insertion of updated record */
606         rc = dt_declare_insert(env, obj, (struct dt_rec *)&qti->qti_rec, key,
607                                th);
608         if (rc)
609                 RETURN(rc);
610
611         /* we might have to update the version of the global index too */
612         rc = dt_declare_version_set(env, obj, th);
613
614         RETURN(rc);
615 }
616
617 /*
618  * Update a record in a quota index file.
619  *
620  * \param env - is the environment passed by the caller
621  * \param th  - is the transaction to use for disk writes
622  * \param obj - is the on-disk index to be updated.
623  * \param id  - is the key to be updated
624  * \param rec - is the input record containing the new quota settings.
625  * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
626  * \param ver   - is the new version of the index if LQUOTA_SET_VER is set or is
627  *                used to return the new version of the index when
628  *                LQUOTA_BUMP_VER is set.
629  *
630  * \retval    - 0 on success, appropriate error on failure
631  */
632 int lquota_disk_write(const struct lu_env *env, struct thandle *th,
633                       struct dt_object *obj, union lquota_id *id,
634                       struct dt_rec *rec, __u32 flags, __u64 *ver)
635 {
636         struct lquota_thread_info       *qti = lquota_info(env);
637         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
638         int                              rc;
639         ENTRY;
640
641         LASSERT(dt_object_exists(obj));
642         LASSERT(obj->do_index_ops != NULL);
643
644         /* lock index */
645         dt_write_lock(env, obj, 0);
646
647         /* check whether there is already an existing record for this ID */
648         rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_rec, key);
649         if (rc == 0) {
650                 /* delete existing record in order to replace it */
651                 rc = dt_delete(env, obj, key, th);
652                 if (rc)
653                         GOTO(out, rc);
654         } else if (rc == -ENOENT) {
655                 /* probably first insert */
656                 rc = 0;
657         } else {
658                 GOTO(out, rc);
659         }
660
661         if (rec != NULL) {
662                 /* insert record with updated quota settings */
663                 rc = dt_insert(env, obj, rec, key, th);
664                 if (rc) {
665                         /* try to insert the old one */
666                         rc = dt_insert(env, obj, (struct dt_rec *)&qti->qti_rec,
667                                        key, th);
668                         LASSERTF(rc == 0, "failed to insert record in quota "
669                                  "index "DFID"\n",
670                                  PFID(lu_object_fid(&obj->do_lu)));
671                         GOTO(out, rc);
672                 }
673         }
674
675         if (flags != 0) {
676                 LASSERT(ver);
677                 if (flags & LQUOTA_BUMP_VER) {
678                         /* caller wants to bump the version, let's first read
679                          * it */
680                         *ver = dt_version_get(env, obj);
681                         (*ver)++;
682                 } else {
683                         LASSERT(flags & LQUOTA_SET_VER);
684                 }
685                 dt_version_set(env, obj, *ver, th);
686         }
687
688         EXIT;
689 out:
690         dt_write_unlock(env, obj);
691         return rc;
692 }
693
694 /*
695  * Update version of an index file
696  *
697  * \param env - is the environment passed by the caller
698  * \param dev - is the backend dt device storing the index file
699  * \param obj - is the on-disk index that should be updated
700  * \param ver - is the new version
701  */
702 int lquota_disk_update_ver(const struct lu_env *env, struct dt_device *dev,
703                            struct dt_object *obj, __u64 ver)
704 {
705         struct thandle  *th;
706         int              rc;
707         ENTRY;
708
709         th = dt_trans_create(env, dev);
710         if (IS_ERR(th))
711                 RETURN(PTR_ERR(th));
712
713         rc = dt_declare_version_set(env, obj, th);
714         if (rc)
715                 GOTO(out, rc);
716
717         rc = dt_trans_start_local(env, dev, th);
718         if (rc)
719                 GOTO(out, rc);
720         th->th_sync = 1;
721
722         dt_version_set(env, obj, ver, th);
723         EXIT;
724 out:
725         dt_trans_stop(env, dev, th);
726         return rc;
727 }
728
729 /*
730  * Write a global record
731  *
732  * \param env - is the environment passed by the caller
733  * \param obj - is the on-disk global index to be updated
734  * \param id  - index to be updated
735  * \param rec - record to be written
736  */
737 int lquota_disk_write_glb(const struct lu_env *env, struct dt_object *obj,
738                           __u64 id, struct lquota_glb_rec *rec)
739 {
740         struct dt_device        *dev = lu2dt_dev(obj->do_lu.lo_dev);
741         struct thandle          *th;
742         struct dt_key           *key = (struct dt_key *)&id;
743         int                      rc;
744         ENTRY;
745
746         th = dt_trans_create(env, dev);
747         if (IS_ERR(th))
748                 RETURN(PTR_ERR(th));
749
750         /* the entry with 0 key can always be found in IAM file. */
751         if (id == 0) {
752                 rc = dt_declare_delete(env, obj, key, th);
753                 if (rc)
754                         GOTO(out, rc);
755         }
756
757         rc = dt_declare_insert(env, obj, (struct dt_rec *)rec, key, th);
758         if (rc)
759                 GOTO(out, rc);
760
761         rc = dt_trans_start_local(env, dev, th);
762         if (rc)
763                 GOTO(out, rc);
764
765         dt_write_lock(env, obj, 0);
766
767         if (id == 0) {
768                 struct lquota_glb_rec *tmp;
769
770                 OBD_ALLOC_PTR(tmp);
771                 if (tmp == NULL)
772                         GOTO(out_lock, rc = -ENOMEM);
773
774                 rc = dt_lookup(env, obj, (struct dt_rec *)tmp, key);
775
776                 OBD_FREE_PTR(tmp);
777                 if (rc == 0) {
778                         rc = dt_delete(env, obj, key, th);
779                         if (rc)
780                                 GOTO(out_lock, rc);
781                 }
782                 rc = 0;
783         }
784
785         rc = dt_insert(env, obj, (struct dt_rec *)rec, key, th);
786 out_lock:
787         dt_write_unlock(env, obj);
788 out:
789         dt_trans_stop(env, dev, th);
790         RETURN(rc);
791 }