Whamcloud - gitweb
a77bfa4ebda6cd9582daf48c6ae7637c4c4dd904
[fs/lustre-release.git] / lustre / quota / lquota_disk.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2014, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * The disk API is used by both the QMT and QSD to access/update on-disk index
33  * files. The API consists of the following functions:
34  *
35  * - lquota_disk_dir_find_create: look-up quota directory, create it if not
36  *                                found.
37  * - lquota_disk_glb_find_create: look-up global index file, create it if not
38  *                                found.
39  * - lquota_disk_slv_find:        look-up a slave index file.
40  * - lquota_disk_slv_find_create: look-up a slave index file. Allocate a FID if
41  *                                required and create the index file on disk if
42  *                                it does not exist.
43  * - lquota_disk_for_each_slv:    iterate over all existing slave index files
44  * - lquota_disk_read:            read quota settings from an index file
45  * - lquota_disk_declare_write:   reserve credits to update a record in an index
46  *                                file
47  * - lquota_disk_write:           update a record in an index file
48  * - lquota_disk_update_ver:      update version of an index file
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include "lquota_internal.h"
54
55 #define LQUOTA_MODE (S_IFREG | S_IRUGO | S_IWUSR)
56
57 /*
58  * Helper function looking up & creating if not found an index file with a
59  * dynamic fid.
60  */
61 static struct dt_object *
62 lquota_disk_find_create(const struct lu_env *env, struct dt_device *dev,
63                         struct dt_object *parent, struct lu_fid *fid,
64                         const struct dt_index_features *idx_feat,
65                         char *name)
66 {
67         struct lquota_thread_info       *qti = lquota_info(env);
68         struct dt_object                *obj;
69         struct local_oid_storage        *los;
70         int                              rc;
71         ENTRY;
72
73         /* Set up local storage */
74         rc = local_oid_storage_init(env, dev, fid, &los);
75         if (rc)
76                 RETURN(ERR_PTR(rc));
77
78         /* lookup/create slave index file */
79         obj = local_index_find_or_create(env, los, parent, name, LQUOTA_MODE,
80                                          idx_feat);
81         if (IS_ERR(obj))
82                 GOTO(out, obj);
83
84         /* local_oid_storage_fini() will finalize the local storage device,
85          * we have to open the object in another device stack */
86         qti->qti_fid = obj->do_lu.lo_header->loh_fid;
87         lu_object_put_nocache(env, &obj->do_lu);
88         obj = dt_locate(env, dev, &qti->qti_fid);
89         if (IS_ERR(obj))
90                 GOTO(out, obj);
91 out:
92         local_oid_storage_fini(env, los);
93         RETURN(obj);
94 }
95
96 /*
97  * helper function to generate the filename associated with a slave index file
98  */
99 static inline int lquota_disk_slv_filename(const struct lu_fid *glb_fid,
100                                            struct obd_uuid *uuid,
101                                            char *filename)
102 {
103         char    *name, *uuid_str;
104
105         /* In most case, the uuid is NULL terminated */
106         if (uuid->uuid[sizeof(*uuid) - 1] != '\0') {
107                 OBD_ALLOC(uuid_str, sizeof(*uuid));
108                 if (uuid_str == NULL)
109                         RETURN(-ENOMEM);
110                 memcpy(uuid_str, uuid->uuid, sizeof(*uuid) - 1);
111         } else {
112                 uuid_str = (char *)uuid->uuid;
113         }
114
115         /* we strip the slave's UUID (in the form of fsname-OST0001_UUID) of
116          * the filesystem name in case this one is changed in the future */
117         name = strrchr(uuid_str, '-');
118         if (name == NULL) {
119                 name = strrchr(uuid_str, ':');
120                 if (name == NULL) {
121                         CERROR("Failed to extract extract filesystem "
122                                "name from UUID %s\n", uuid_str);
123                         if (uuid_str != uuid->uuid)
124                                 OBD_FREE(uuid_str, sizeof(*uuid));
125                         return -EINVAL;
126                 }
127         }
128         name++;
129
130         /* the filename is composed of the most signicant bits of the global
131          * FID, that's to say the oid which encodes the pool id, pool type and
132          * quota type, followed by the export UUID */
133         sprintf(filename, "0x%x-%s", glb_fid->f_oid, name);
134
135         if (uuid_str != uuid->uuid)
136                 OBD_FREE(uuid_str, sizeof(*uuid));
137
138         return 0;
139 }
140
141 /*
142  * Set up quota directory (either "quota_master" or "quota_slave") for a QMT or
143  * QSD instance. This function is also used to create per-pool directory on
144  * the quota master.
145  * The directory is created with a local sequence if it does not exist already.
146  * This function is called at ->ldo_prepare time when the full device stack is
147  * configured.
148  *
149  * \param env  - is the environment passed by the caller
150  * \param dev  - is the dt_device where to create the quota directory
151  * \param parent  - is the parent directory. If not specified, the directory
152  *                  will be created under the root directory
153  * \param name - is the name of quota directory to be created
154  *
155  * \retval     - pointer to quota root dt_object on success, appropriate error
156  *               on failure
157  */
158 struct dt_object *lquota_disk_dir_find_create(const struct lu_env *env,
159                                               struct dt_device *dev,
160                                               struct dt_object *parent,
161                                               const char *name)
162 {
163         struct lquota_thread_info       *qti = lquota_info(env);
164         struct dt_object                *qt_dir = NULL;
165         struct local_oid_storage        *los = NULL;
166         int                              rc;
167         ENTRY;
168
169         /* Set up local storage to create the quota directory.
170          * We use the sequence reserved for local named objects */
171         lu_local_name_obj_fid(&qti->qti_fid, 1);
172         rc = local_oid_storage_init(env, dev, &qti->qti_fid, &los);
173         if (rc)
174                 RETURN(ERR_PTR(rc));
175
176         if (parent == NULL) {
177                 /* Fetch dt object associated with root directory */
178                 rc = dt_root_get(env, dev, &qti->qti_fid);
179                 if (rc)
180                         GOTO(out, rc);
181
182                 parent = dt_locate_at(env, dev, &qti->qti_fid,
183                                       dev->dd_lu_dev.ld_site->ls_top_dev, NULL);
184                 if (IS_ERR(parent))
185                         GOTO(out, rc = PTR_ERR(parent));
186         } else {
187                 lu_object_get(&parent->do_lu);
188         }
189
190         /* create quota directory to be used for all quota index files */
191         qt_dir = local_file_find_or_create(env, los, parent, name, S_IFDIR |
192                                            S_IRUGO | S_IWUSR | S_IXUGO);
193         if (IS_ERR(qt_dir))
194                 GOTO(out, rc = PTR_ERR(qt_dir));
195
196         /* local_oid_storage_fini() will finalize the local storage device,
197          * we have to open the object in another device stack */
198         qti->qti_fid = qt_dir->do_lu.lo_header->loh_fid;
199         lu_object_put_nocache(env, &qt_dir->do_lu);
200         qt_dir = dt_locate(env, dev, &qti->qti_fid);
201         if (IS_ERR(qt_dir))
202                 GOTO(out, rc = PTR_ERR(qt_dir));
203
204         if (!dt_try_as_dir(env, qt_dir))
205                 GOTO(out, rc = -ENOTDIR);
206         EXIT;
207 out:
208         if (parent != NULL && !IS_ERR(parent))
209                 lu_object_put(env, &parent->do_lu);
210         if (los != NULL)
211                 local_oid_storage_fini(env, los);
212         if (rc) {
213                 if (qt_dir != NULL && !IS_ERR(qt_dir))
214                         lu_object_put(env, &qt_dir->do_lu);
215                 qt_dir = ERR_PTR(rc);
216         }
217         return qt_dir;
218 }
219
220 /*
221  * Look-up/create a global index file.
222  *
223  * \param env - is the environment passed by the caller
224  * \parap dev - is the dt_device where to lookup/create the global index file
225  * \param parent - is the parent directory where to create the global index if
226  *                 not found
227  * \param fid - is the fid of the global index to be looked up/created
228  * \parap local - indicates whether the index should be created with a local
229  *                generated fid or with \fid
230  *
231  * \retval     - pointer to the dt_object of the global index on success,
232  *               appropriate error on failure
233  */
234 struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env,
235                                               struct dt_device *dev,
236                                               struct dt_object *parent,
237                                               struct lu_fid *fid, bool local)
238 {
239         struct lquota_thread_info       *qti = lquota_info(env);
240         struct dt_object                *glb_idx;
241         const struct dt_index_features  *idx_feat;
242         ENTRY;
243
244         CDEBUG(D_QUOTA, "look-up/create %sglobal idx file ("DFID")\n",
245                local ? "local " : "", PFID(fid));
246
247         idx_feat = &dt_quota_glb_features;
248
249         /* the filename is composed of the most signicant bits of the FID,
250          * that's to say the oid which encodes the pool id, pool type and quota
251          * type */
252         sprintf(qti->qti_buf, "0x%x", fid->f_oid);
253
254         if (local) {
255                 /* We use the sequence reserved for local named objects */
256                 lu_local_name_obj_fid(&qti->qti_fid, 1);
257                 glb_idx = lquota_disk_find_create(env, dev, parent,
258                                                   &qti->qti_fid, idx_feat,
259                                                   qti->qti_buf);
260         } else {
261                 /* look-up/create global index on disk */
262                 glb_idx = local_index_find_or_create_with_fid(env, dev, fid,
263                                                               parent,
264                                                               qti->qti_buf,
265                                                               LQUOTA_MODE,
266                                                               idx_feat);
267         }
268
269         if (IS_ERR(glb_idx)) {
270                 CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld "
271                        "local:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
272                        PFID(fid), PTR_ERR(glb_idx), local);
273                 RETURN(glb_idx);
274         }
275
276         /* install index operation vector */
277         if (glb_idx->do_index_ops == NULL) {
278                 int rc;
279
280                 rc = glb_idx->do_ops->do_index_try(env, glb_idx, idx_feat);
281                 if (rc) {
282                         CERROR("%s: failed to setup index operations for "DFID
283                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
284                                PFID(lu_object_fid(&glb_idx->do_lu)), rc);
285                         lu_object_put(env, &glb_idx->do_lu);
286                         glb_idx = ERR_PTR(rc);
287                 }
288         }
289
290         RETURN(glb_idx);
291 }
292
293 /*
294  * Look-up a slave index file.
295  *
296  * \param env - is the environment passed by the caller
297  * \param dev - is the backend dt_device where to look-up/create the slave index
298  * \param parent - is the parent directory where to lookup the slave index
299  * \param glb_fid - is the fid of the global index file associated with this
300  *                  slave index.
301  * \param uuid    - is the uuid of slave which is (re)connecting to the master
302  *                  target
303  *
304  * \retval     - pointer to the dt_object of the slave index on success,
305  *               appropriate error on failure
306  */
307 struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
308                                        struct dt_device *dev,
309                                        struct dt_object *parent,
310                                        const struct lu_fid *glb_fid,
311                                        struct obd_uuid *uuid)
312 {
313         struct lquota_thread_info       *qti = lquota_info(env);
314         struct dt_object                *slv_idx;
315         int                              rc;
316         ENTRY;
317
318         LASSERT(uuid != NULL);
319
320         CDEBUG(D_QUOTA, "lookup slave index file for %s\n",
321                obd_uuid2str(uuid));
322
323         /* generate filename associated with the slave */
324         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
325         if (rc)
326                 RETURN(ERR_PTR(rc));
327
328         /* lookup slave index file */
329         rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
330         if (rc)
331                 RETURN(ERR_PTR(rc));
332
333         /* name is found, get the object */
334         slv_idx = dt_locate(env, dev, &qti->qti_fid);
335         if (IS_ERR(slv_idx))
336                 RETURN(slv_idx);
337
338         if (slv_idx->do_index_ops == NULL) {
339                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
340                                                    &dt_quota_slv_features);
341                 if (rc) {
342                         CERROR("%s: failed to setup slave index operations for "
343                                "%s, rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
344                                obd_uuid2str(uuid), rc);
345                         lu_object_put(env, &slv_idx->do_lu);
346                         slv_idx = ERR_PTR(rc);
347                 }
348         }
349
350         RETURN(slv_idx);
351 }
352
353 /*
354  * Look-up a slave index file. If the slave index isn't found:
355  * - if local is set to false, we allocate a FID from FID_SEQ_QUOTA sequence and
356  *   create the index.
357  * - otherwise, we create the index file with a local reserved FID (see
358  *   lquota_local_oid)
359  *
360  * \param env - is the environment passed by the caller
361  * \param dev - is the backend dt_device where to look-up/create the slave index
362  * \param parent - is the parent directory where to create the slave index if
363  *                 it does not exist already
364  * \param glb_fid - is the fid of the global index file associated with this
365  *                  slave index.
366  * \param uuid    - is the uuid of slave which is (re)connecting to the master
367  *                  target
368  * \param local   - indicate whether to use local reserved FID (LQUOTA_USR_OID
369  *                  & LQUOTA_GRP_OID) for the slave index creation or to
370  *                  allocate a new fid from sequence FID_SEQ_QUOTA
371  *
372  * \retval     - pointer to the dt_object of the slave index on success,
373  *               appropriate error on failure
374  */
375 struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
376                                               struct dt_device *dev,
377                                               struct dt_object *parent,
378                                               struct lu_fid *glb_fid,
379                                               struct obd_uuid *uuid,
380                                               bool local)
381 {
382         struct lquota_thread_info       *qti = lquota_info(env);
383         struct dt_object                *slv_idx;
384         int                              rc;
385         ENTRY;
386
387         LASSERT(uuid != NULL);
388
389         CDEBUG(D_QUOTA, "lookup/create slave index file for %s\n",
390                obd_uuid2str(uuid));
391
392         /* generate filename associated with the slave */
393         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
394         if (rc)
395                 RETURN(ERR_PTR(rc));
396
397         /* Slave indexes uses the FID_SEQ_QUOTA sequence since they can be read
398          * through the network */
399         qti->qti_fid.f_seq = FID_SEQ_QUOTA;
400         qti->qti_fid.f_ver = 0;
401         if (local) {
402                 int type;
403
404                 rc = lquota_extract_fid(glb_fid, NULL, NULL, &type);
405                 if (rc)
406                         RETURN(ERR_PTR(rc));
407
408                 /* use predefined fid in the reserved oid list */
409                 qti->qti_fid.f_oid = (type == USRQUOTA) ? LQUOTA_USR_OID
410                                                         : LQUOTA_GRP_OID;
411
412                 slv_idx = local_index_find_or_create_with_fid(env, dev,
413                                                               &qti->qti_fid,
414                                                               parent,
415                                                               qti->qti_buf,
416                                                               LQUOTA_MODE,
417                                                         &dt_quota_slv_features);
418         } else {
419                 /* allocate fid dynamically if index does not exist already */
420                 qti->qti_fid.f_oid = LQUOTA_GENERATED_OID;
421
422                 /* lookup/create slave index file */
423                 slv_idx = lquota_disk_find_create(env, dev, parent,
424                                                   &qti->qti_fid,
425                                                   &dt_quota_slv_features,
426                                                   qti->qti_buf);
427         }
428
429         if (IS_ERR(slv_idx))
430                 RETURN(slv_idx);
431
432         /* install index operation vector */
433         if (slv_idx->do_index_ops == NULL) {
434                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
435                                                    &dt_quota_slv_features);
436                 if (rc) {
437                         CERROR("%s: failed to setup index operations for "DFID
438                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
439                                PFID(lu_object_fid(&slv_idx->do_lu)), rc);
440                         lu_object_put(env, &slv_idx->do_lu);
441                         slv_idx = ERR_PTR(rc);
442                 }
443         }
444
445         RETURN(slv_idx);
446 }
447
448 /*
449  * Iterate over all slave index files associated with global index \glb_fid and
450  * invoke a callback function for each slave index file.
451  *
452  * \param env     - is the environment passed by the caller
453  * \param parent  - is the parent directory where the slave index files are
454  *                  stored
455  * \param glb_fid - is the fid of the global index file associated with the
456  *                  slave indexes to scan
457  * \param func    - is the callback function to call each time a slave index
458  *                  file is found
459  * \param arg     - is an opaq argument passed to the callback function \func
460  */
461 int lquota_disk_for_each_slv(const struct lu_env *env, struct dt_object *parent,
462                              struct lu_fid *glb_fid, lquota_disk_slv_cb_t func,
463                              void *arg)
464 {
465         struct lquota_thread_info       *qti = lquota_info(env);
466         struct dt_it                    *it;
467         const struct dt_it_ops          *iops;
468         char                            *name;
469         int                              rc;
470         ENTRY;
471
472         OBD_ALLOC(name, LQUOTA_NAME_MAX);
473         if (name == NULL)
474                 RETURN(-ENOMEM);
475
476         /* filename associated with slave index files are prefixed with the most
477          * signicant bits of the global FID */
478         sprintf(name, "0x%x-", glb_fid->f_oid);
479
480         iops = &parent->do_index_ops->dio_it;
481         it = iops->init(env, parent, 0);
482         if (IS_ERR(it)) {
483                 OBD_FREE(name, LQUOTA_NAME_MAX);
484                 RETURN(PTR_ERR(it));
485         }
486
487         rc = iops->load(env, it, 0);
488         if (rc == 0) {
489                 /*
490                  * Iterator didn't find record with exactly the key requested.
491                  *
492                  * It is currently either
493                  *
494                  *     - positioned above record with key less than
495                  *     requested---skip it.
496                  *
497                  *     - or not positioned at all (is in IAM_IT_SKEWED
498                  *     state)---position it on the next item.
499                  */
500                 rc = iops->next(env, it);
501         } else if (rc > 0)
502                 rc = 0;
503
504         while (rc == 0) {
505                 struct dt_key   *key;
506                 int              len;
507
508                 len = iops->key_size(env, it);
509                 /* IAM iterator can return record with zero len. */
510                 if (len == 0 || len <= strlen(name) || len >= LQUOTA_NAME_MAX)
511                         goto next;
512
513                 key = iops->key(env, it);
514                 if (IS_ERR(key)) {
515                         rc = PTR_ERR(key);
516                         break;
517                 }
518
519                 if (strncmp((char *)key, name, strlen(name)) != 0)
520                         goto next;
521
522                 /* ldiskfs OSD returns filename as stored in directory entry
523                  * which does not end up with '\0' */
524                 memcpy(&qti->qti_buf, key, len);
525                 qti->qti_buf[len] = '\0';
526
527                 /* lookup fid associated with this slave index file */
528                 rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
529                 if (rc)
530                         break;
531
532                 if (qti->qti_fid.f_seq != FID_SEQ_QUOTA)
533                         goto next;
534
535                 rc = func(env, glb_fid, (char *)key, &qti->qti_fid, arg);
536                 if (rc)
537                         break;
538 next:
539                 do {
540                         rc = iops->next(env, it);
541                 } while (rc == -ESTALE);
542         }
543
544         iops->put(env, it);
545         iops->fini(env, it);
546         OBD_FREE(name, LQUOTA_NAME_MAX);
547         if (rc > 0)
548                 rc = 0;
549         RETURN(rc);
550 }
551
552 /*
553  * Retrieve quota settings from disk for a particular identifier.
554  *
555  * \param env - is the environment passed by the caller
556  * \param obj - is the on-disk index where quota settings are stored.
557  * \param id  - is the key to be updated
558  * \param rec - is the output record where to store quota settings.
559  *
560  * \retval    - 0 on success, appropriate error on failure
561  */
562 int lquota_disk_read(const struct lu_env *env, struct dt_object *obj,
563                      union lquota_id *id, struct dt_rec *rec)
564 {
565         int     rc;
566         ENTRY;
567
568         LASSERT(dt_object_exists(obj));
569         LASSERT(obj->do_index_ops != NULL);
570
571         /* lookup on-disk record from index file */
572         dt_read_lock(env, obj, 0);
573         rc = dt_lookup(env, obj, rec, (struct dt_key *)&id->qid_uid);
574         dt_read_unlock(env, obj);
575
576         RETURN(rc);
577 }
578
579 /*
580  * Reserve enough credits to update a record in a quota index file.
581  *
582  * \param env - is the environment passed by the caller
583  * \param th  - is the transaction to use for disk writes
584  * \param obj - is the on-disk index where quota settings are stored.
585  * \param id  - is the key to be updated
586  *
587  * \retval    - 0 on success, appropriate error on failure
588  */
589 int lquota_disk_declare_write(const struct lu_env *env, struct thandle *th,
590                               struct dt_object *obj, union lquota_id *id)
591 {
592         struct lquota_thread_info       *qti = lquota_info(env);
593         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
594         int                              rc;
595         ENTRY;
596
597         LASSERT(dt_object_exists(obj));
598         LASSERT(obj->do_index_ops != NULL);
599
600         /* speculative delete declaration in case there is already an existing
601          * record in the index */
602         rc = dt_declare_delete(env, obj, key, th);
603         if (rc)
604                 RETURN(rc);
605
606         /* declare insertion of updated record */
607         rc = dt_declare_insert(env, obj, (struct dt_rec *)&qti->qti_rec, key,
608                                th);
609         if (rc)
610                 RETURN(rc);
611
612         /* we might have to update the version of the global index too */
613         rc = dt_declare_version_set(env, obj, th);
614
615         RETURN(rc);
616 }
617
618 /*
619  * Update a record in a quota index file.
620  *
621  * \param env - is the environment passed by the caller
622  * \param th  - is the transaction to use for disk writes
623  * \param obj - is the on-disk index to be updated.
624  * \param id  - is the key to be updated
625  * \param rec - is the input record containing the new quota settings.
626  * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
627  * \param ver   - is the new version of the index if LQUOTA_SET_VER is set or is
628  *                used to return the new version of the index when
629  *                LQUOTA_BUMP_VER is set.
630  *
631  * \retval    - 0 on success, appropriate error on failure
632  */
633 int lquota_disk_write(const struct lu_env *env, struct thandle *th,
634                       struct dt_object *obj, union lquota_id *id,
635                       struct dt_rec *rec, __u32 flags, __u64 *ver)
636 {
637         struct lquota_thread_info       *qti = lquota_info(env);
638         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
639         int                              rc;
640         ENTRY;
641
642         LASSERT(dt_object_exists(obj));
643         LASSERT(obj->do_index_ops != NULL);
644
645         /* lock index */
646         dt_write_lock(env, obj, 0);
647
648         /* check whether there is already an existing record for this ID */
649         rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_rec, key);
650         if (rc == 0) {
651                 /* delete existing record in order to replace it */
652                 rc = dt_delete(env, obj, key, th);
653                 if (rc)
654                         GOTO(out, rc);
655         } else if (rc == -ENOENT) {
656                 /* probably first insert */
657                 rc = 0;
658         } else {
659                 GOTO(out, rc);
660         }
661
662         if (rec != NULL) {
663                 /* insert record with updated quota settings */
664                 rc = dt_insert(env, obj, rec, key, th, 1);
665                 if (rc) {
666                         /* try to insert the old one */
667                         rc = dt_insert(env, obj, (struct dt_rec *)&qti->qti_rec,
668                                        key, th, 1);
669                         LASSERTF(rc == 0, "failed to insert record in quota "
670                                  "index "DFID"\n",
671                                  PFID(lu_object_fid(&obj->do_lu)));
672                         GOTO(out, rc);
673                 }
674         }
675
676         if (flags != 0) {
677                 LASSERT(ver);
678                 if (flags & LQUOTA_BUMP_VER) {
679                         /* caller wants to bump the version, let's first read
680                          * it */
681                         *ver = dt_version_get(env, obj);
682                         (*ver)++;
683                 } else {
684                         LASSERT(flags & LQUOTA_SET_VER);
685                 }
686                 dt_version_set(env, obj, *ver, th);
687         }
688
689         EXIT;
690 out:
691         dt_write_unlock(env, obj);
692         return rc;
693 }
694
695 /*
696  * Update version of an index file
697  *
698  * \param env - is the environment passed by the caller
699  * \param dev - is the backend dt device storing the index file
700  * \param obj - is the on-disk index that should be updated
701  * \param ver - is the new version
702  */
703 int lquota_disk_update_ver(const struct lu_env *env, struct dt_device *dev,
704                            struct dt_object *obj, __u64 ver)
705 {
706         struct thandle  *th;
707         int              rc;
708         ENTRY;
709
710         th = dt_trans_create(env, dev);
711         if (IS_ERR(th))
712                 RETURN(PTR_ERR(th));
713
714         rc = dt_declare_version_set(env, obj, th);
715         if (rc)
716                 GOTO(out, rc);
717
718         rc = dt_trans_start_local(env, dev, th);
719         if (rc)
720                 GOTO(out, rc);
721         th->th_sync = 1;
722
723         dt_version_set(env, obj, ver, th);
724         EXIT;
725 out:
726         dt_trans_stop(env, dev, th);
727         return rc;
728 }
729
730 /*
731  * Write a global record
732  *
733  * \param env - is the environment passed by the caller
734  * \param obj - is the on-disk global index to be updated
735  * \param id  - index to be updated
736  * \param rec - record to be written
737  */
738 int lquota_disk_write_glb(const struct lu_env *env, struct dt_object *obj,
739                           __u64 id, struct lquota_glb_rec *rec)
740 {
741         struct dt_device        *dev = lu2dt_dev(obj->do_lu.lo_dev);
742         struct thandle          *th;
743         struct dt_key           *key = (struct dt_key *)&id;
744         int                      rc;
745         ENTRY;
746
747         th = dt_trans_create(env, dev);
748         if (IS_ERR(th))
749                 RETURN(PTR_ERR(th));
750
751         /* the entry with 0 key can always be found in IAM file. */
752         if (id == 0) {
753                 rc = dt_declare_delete(env, obj, key, th);
754                 if (rc)
755                         GOTO(out, rc);
756         }
757
758         rc = dt_declare_insert(env, obj, (struct dt_rec *)rec, key, th);
759         if (rc)
760                 GOTO(out, rc);
761
762         rc = dt_trans_start_local(env, dev, th);
763         if (rc)
764                 GOTO(out, rc);
765
766         dt_write_lock(env, obj, 0);
767
768         if (id == 0) {
769                 struct lquota_glb_rec *tmp;
770
771                 OBD_ALLOC_PTR(tmp);
772                 if (tmp == NULL)
773                         GOTO(out_lock, rc = -ENOMEM);
774
775                 rc = dt_lookup(env, obj, (struct dt_rec *)tmp, key);
776
777                 OBD_FREE_PTR(tmp);
778                 if (rc == 0) {
779                         rc = dt_delete(env, obj, key, th);
780                         if (rc)
781                                 GOTO(out_lock, rc);
782                 }
783                 rc = 0;
784         }
785
786         rc = dt_insert(env, obj, (struct dt_rec *)rec, key, th, 1);
787 out_lock:
788         dt_write_unlock(env, obj);
789 out:
790         dt_trans_stop(env, dev, th);
791         RETURN(rc);
792 }