Whamcloud - gitweb
LU-11775 obdclass: protect imp_sec using rwlock_t
[fs/lustre-release.git] / lustre / quota / lquota_disk.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * The disk API is used by both the QMT and QSD to access/update on-disk index
33  * files. The API consists of the following functions:
34  *
35  * - lquota_disk_dir_find_create: look-up quota directory, create it if not
36  *                                found.
37  * - lquota_disk_glb_find_create: look-up global index file, create it if not
38  *                                found.
39  * - lquota_disk_slv_find:        look-up a slave index file.
40  * - lquota_disk_slv_find_create: look-up a slave index file. Allocate a FID if
41  *                                required and create the index file on disk if
42  *                                it does not exist.
43  * - lquota_disk_for_each_slv:    iterate over all existing slave index files
44  * - lquota_disk_read:            read quota settings from an index file
45  * - lquota_disk_declare_write:   reserve credits to update a record in an index
46  *                                file
47  * - lquota_disk_write:           update a record in an index file
48  * - lquota_disk_update_ver:      update version of an index file
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include <obd_class.h>
54 #include "lquota_internal.h"
55
56 #define LQUOTA_MODE (S_IFREG | S_IRUGO | S_IWUSR)
57
58 /*
59  * Helper function looking up & creating if not found an index file with a
60  * dynamic fid.
61  */
62 static struct dt_object *
63 lquota_disk_find_create(const struct lu_env *env, struct dt_device *dev,
64                         struct dt_object *parent, struct lu_fid *fid,
65                         const struct dt_index_features *idx_feat,
66                         char *name)
67 {
68         struct lquota_thread_info *qti = lquota_info(env);
69         struct dt_object *obj;
70         struct local_oid_storage *los;
71         int rc;
72         ENTRY;
73
74         /* Set up local storage */
75         rc = local_oid_storage_init(env, dev, fid, &los);
76         if (rc)
77                 RETURN(ERR_PTR(rc));
78
79         /* lookup/create slave index file */
80         obj = local_index_find_or_create(env, los, parent, name, LQUOTA_MODE,
81                                          idx_feat);
82         if (IS_ERR(obj))
83                 GOTO(out, obj);
84
85         /* local_oid_storage_fini() will finalize the local storage device,
86          * we have to open the object in another device stack */
87         qti->qti_fid = obj->do_lu.lo_header->loh_fid;
88         dt_object_put_nocache(env, obj);
89         obj = dt_locate(env, dev, &qti->qti_fid);
90         if (IS_ERR(obj))
91                 GOTO(out, obj);
92 out:
93         local_oid_storage_fini(env, los);
94         RETURN(obj);
95 }
96
97 /*
98  * helper function to generate the filename associated with a slave index file
99  */
100 static inline int lquota_disk_slv_filename(const struct lu_fid *glb_fid,
101                                            struct obd_uuid *uuid,
102                                            char *filename)
103 {
104         char    *name, *uuid_str;
105
106         /* In most case, the uuid is NULL terminated */
107         if (uuid->uuid[sizeof(*uuid) - 1] != '\0') {
108                 OBD_ALLOC(uuid_str, sizeof(*uuid));
109                 if (uuid_str == NULL)
110                         RETURN(-ENOMEM);
111                 memcpy(uuid_str, uuid->uuid, sizeof(*uuid) - 1);
112         } else {
113                 uuid_str = (char *)uuid->uuid;
114         }
115
116         /* we strip the slave's UUID (in the form of fsname-OST0001_UUID) of
117          * the filesystem name in case this one is changed in the future */
118         name = strrchr(uuid_str, '-');
119         if (name == NULL) {
120                 name = strrchr(uuid_str, ':');
121                 if (name == NULL) {
122                         CERROR("Failed to extract extract filesystem "
123                                "name from UUID %s\n", uuid_str);
124                         if (uuid_str != uuid->uuid)
125                                 OBD_FREE(uuid_str, sizeof(*uuid));
126                         return -EINVAL;
127                 }
128         }
129         name++;
130
131         /* the filename is composed of the most signicant bits of the global
132          * FID, that's to say the oid which encodes the pool id, pool type and
133          * quota type, followed by the export UUID */
134         sprintf(filename, "0x%x-%s", glb_fid->f_oid, name);
135
136         if (uuid_str != uuid->uuid)
137                 OBD_FREE(uuid_str, sizeof(*uuid));
138
139         return 0;
140 }
141
142 /*
143  * Set up quota directory (either "quota_master" or "quota_slave") for a QMT or
144  * QSD instance. This function is also used to create per-pool directory on
145  * the quota master.
146  * The directory is created with a local sequence if it does not exist already.
147  * This function is called at ->ldo_prepare time when the full device stack is
148  * configured.
149  *
150  * \param env  - is the environment passed by the caller
151  * \param dev  - is the dt_device where to create the quota directory
152  * \param parent  - is the parent directory. If not specified, the directory
153  *                  will be created under the root directory
154  * \param name - is the name of quota directory to be created
155  *
156  * \retval     - pointer to quota root dt_object on success, appropriate error
157  *               on failure
158  */
159 struct dt_object *lquota_disk_dir_find_create(const struct lu_env *env,
160                                               struct dt_device *dev,
161                                               struct dt_object *parent,
162                                               const char *name)
163 {
164         struct lquota_thread_info       *qti = lquota_info(env);
165         struct dt_object                *qt_dir = NULL;
166         struct local_oid_storage        *los = NULL;
167         int                              rc;
168         ENTRY;
169
170         /* Set up local storage to create the quota directory.
171          * We use the sequence reserved for local named objects */
172         lu_local_name_obj_fid(&qti->qti_fid, 1);
173         rc = local_oid_storage_init(env, dev, &qti->qti_fid, &los);
174         if (rc)
175                 RETURN(ERR_PTR(rc));
176
177         if (parent == NULL) {
178                 /* Fetch dt object associated with root directory */
179                 rc = dt_root_get(env, dev, &qti->qti_fid);
180                 if (rc)
181                         GOTO(out, rc);
182
183                 parent = dt_locate_at(env, dev, &qti->qti_fid,
184                                       dev->dd_lu_dev.ld_site->ls_top_dev, NULL);
185                 if (IS_ERR(parent))
186                         GOTO(out, rc = PTR_ERR(parent));
187         } else {
188                 lu_object_get(&parent->do_lu);
189         }
190
191         /* create quota directory to be used for all quota index files */
192         qt_dir = local_file_find_or_create(env, los, parent, name, S_IFDIR |
193                                            S_IRUGO | S_IWUSR | S_IXUGO);
194         if (IS_ERR(qt_dir))
195                 GOTO(out, rc = PTR_ERR(qt_dir));
196
197         /* local_oid_storage_fini() will finalize the local storage device,
198          * we have to open the object in another device stack */
199         qti->qti_fid = qt_dir->do_lu.lo_header->loh_fid;
200         dt_object_put_nocache(env, qt_dir);
201         qt_dir = dt_locate(env, dev, &qti->qti_fid);
202         if (IS_ERR(qt_dir))
203                 GOTO(out, rc = PTR_ERR(qt_dir));
204
205         if (!dt_try_as_dir(env, qt_dir))
206                 GOTO(out, rc = -ENOTDIR);
207         EXIT;
208 out:
209         if (parent != NULL && !IS_ERR(parent))
210                 dt_object_put(env, parent);
211         if (los != NULL)
212                 local_oid_storage_fini(env, los);
213         if (rc) {
214                 if (qt_dir != NULL && !IS_ERR(qt_dir))
215                         dt_object_put(env, qt_dir);
216                 qt_dir = ERR_PTR(rc);
217         }
218         return qt_dir;
219 }
220
221 /*
222  * Look-up/create a global index file.
223  *
224  * \param env - is the environment passed by the caller
225  * \parap dev - is the dt_device where to lookup/create the global index file
226  * \param parent - is the parent directory where to create the global index if
227  *                 not found
228  * \param fid - is the fid of the global index to be looked up/created
229  * \parap local - indicates whether the index should be created with a local
230  *                generated fid or with \fid
231  *
232  * \retval     - pointer to the dt_object of the global index on success,
233  *               appropriate error on failure
234  */
235 struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env,
236                                               struct dt_device *dev,
237                                               struct dt_object *parent,
238                                               struct lu_fid *fid, bool local)
239 {
240         struct lquota_thread_info       *qti = lquota_info(env);
241         struct dt_object                *glb_idx;
242         const struct dt_index_features  *idx_feat;
243         ENTRY;
244
245         CDEBUG(D_QUOTA, "look-up/create %sglobal idx file ("DFID")\n",
246                local ? "local " : "", PFID(fid));
247
248         idx_feat = &dt_quota_glb_features;
249
250         /* the filename is composed of the most signicant bits of the FID,
251          * that's to say the oid which encodes the pool id, pool type and quota
252          * type */
253         sprintf(qti->qti_buf, "0x%x", fid->f_oid);
254
255         if (local) {
256                 /* We use the sequence reserved for local named objects */
257                 lu_local_name_obj_fid(&qti->qti_fid, 1);
258                 glb_idx = lquota_disk_find_create(env, dev, parent,
259                                                   &qti->qti_fid, idx_feat,
260                                                   qti->qti_buf);
261         } else {
262                 /* look-up/create global index on disk */
263                 glb_idx = local_index_find_or_create_with_fid(env, dev, fid,
264                                                               parent,
265                                                               qti->qti_buf,
266                                                               LQUOTA_MODE,
267                                                               idx_feat);
268         }
269
270         if (IS_ERR(glb_idx)) {
271                 CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld "
272                        "local:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
273                        PFID(fid), PTR_ERR(glb_idx), local);
274                 RETURN(glb_idx);
275         }
276
277         /* install index operation vector */
278         if (glb_idx->do_index_ops == NULL) {
279                 int rc;
280
281                 rc = glb_idx->do_ops->do_index_try(env, glb_idx, idx_feat);
282                 if (rc) {
283                         CERROR("%s: failed to setup index operations for "DFID
284                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
285                                PFID(lu_object_fid(&glb_idx->do_lu)), rc);
286                         dt_object_put(env, glb_idx);
287                         glb_idx = ERR_PTR(rc);
288                 }
289         }
290
291         RETURN(glb_idx);
292 }
293
294 /*
295  * Look-up a slave index file.
296  *
297  * \param env - is the environment passed by the caller
298  * \param dev - is the backend dt_device where to look-up/create the slave index
299  * \param parent - is the parent directory where to lookup the slave index
300  * \param glb_fid - is the fid of the global index file associated with this
301  *                  slave index.
302  * \param uuid    - is the uuid of slave which is (re)connecting to the master
303  *                  target
304  *
305  * \retval     - pointer to the dt_object of the slave index on success,
306  *               appropriate error on failure
307  */
308 struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
309                                        struct dt_device *dev,
310                                        struct dt_object *parent,
311                                        const struct lu_fid *glb_fid,
312                                        struct obd_uuid *uuid)
313 {
314         struct lquota_thread_info       *qti = lquota_info(env);
315         struct dt_object                *slv_idx;
316         int                              rc;
317         ENTRY;
318
319         LASSERT(uuid != NULL);
320
321         CDEBUG(D_QUOTA, "lookup slave index file for %s\n",
322                obd_uuid2str(uuid));
323
324         /* generate filename associated with the slave */
325         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
326         if (rc)
327                 RETURN(ERR_PTR(rc));
328
329         /* lookup slave index file */
330         rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
331         if (rc)
332                 RETURN(ERR_PTR(rc));
333
334         /* name is found, get the object */
335         slv_idx = dt_locate(env, dev, &qti->qti_fid);
336         if (IS_ERR(slv_idx))
337                 RETURN(slv_idx);
338
339         if (slv_idx->do_index_ops == NULL) {
340                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
341                                                    &dt_quota_slv_features);
342                 if (rc) {
343                         CERROR("%s: failed to setup slave index operations for "
344                                "%s, rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
345                                obd_uuid2str(uuid), rc);
346                         dt_object_put(env, slv_idx);
347                         slv_idx = ERR_PTR(rc);
348                 }
349         }
350
351         RETURN(slv_idx);
352 }
353
354 /*
355  * Look-up a slave index file. If the slave index isn't found:
356  * - if local is set to false, we allocate a FID from FID_SEQ_QUOTA sequence and
357  *   create the index.
358  * - otherwise, we create the index file with a local reserved FID (see
359  *   lquota_local_oid)
360  *
361  * \param env - is the environment passed by the caller
362  * \param dev - is the backend dt_device where to look-up/create the slave index
363  * \param parent - is the parent directory where to create the slave index if
364  *                 it does not exist already
365  * \param glb_fid - is the fid of the global index file associated with this
366  *                  slave index.
367  * \param uuid    - is the uuid of slave which is (re)connecting to the master
368  *                  target
369  * \param local   - indicate whether to use local reserved FID (LQUOTA_USR_OID
370  *                  & LQUOTA_GRP_OID) for the slave index creation or to
371  *                  allocate a new fid from sequence FID_SEQ_QUOTA
372  *
373  * \retval     - pointer to the dt_object of the slave index on success,
374  *               appropriate error on failure
375  */
376 struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
377                                               struct dt_device *dev,
378                                               struct dt_object *parent,
379                                               struct lu_fid *glb_fid,
380                                               struct obd_uuid *uuid,
381                                               bool local)
382 {
383         struct lquota_thread_info       *qti = lquota_info(env);
384         struct dt_object                *slv_idx;
385         int                              rc, type;
386         ENTRY;
387
388         LASSERT(uuid != NULL);
389
390         CDEBUG(D_QUOTA, "lookup/create slave index file for %s\n",
391                obd_uuid2str(uuid));
392
393         /* generate filename associated with the slave */
394         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
395         if (rc)
396                 RETURN(ERR_PTR(rc));
397
398         if (lu_device_is_md(dev->dd_lu_dev.ld_site->ls_top_dev))
399                 type = LDD_F_SV_TYPE_MDT;
400         else
401                 type = LDD_F_SV_TYPE_OST;
402
403         /* Slave indexes uses the FID_SEQ_QUOTA sequence since they can be read
404          * through the network */
405         qti->qti_fid.f_seq = FID_SEQ_QUOTA;
406         qti->qti_fid.f_ver = 0;
407         if (local) {
408                 int pool_type, qtype;
409
410                 rc = lquota_extract_fid(glb_fid, NULL, &pool_type, &qtype);
411                 if (rc)
412                         RETURN(ERR_PTR(rc));
413
414                 /* use predefined fid in the reserved oid list */
415                 if ((type == LDD_F_SV_TYPE_MDT && pool_type == LQUOTA_RES_MD) ||
416                     (type == LDD_F_SV_TYPE_OST && pool_type == LQUOTA_RES_DT))
417                         qti->qti_fid.f_oid = qtype2slv_oid(qtype);
418                 else
419                         qti->qti_fid.f_oid = pool_type << 16 |
420                                                         qtype2slv_oid(qtype);
421
422                 slv_idx = local_index_find_or_create_with_fid(env, dev,
423                                                               &qti->qti_fid,
424                                                               parent,
425                                                               qti->qti_buf,
426                                                               LQUOTA_MODE,
427                                                         &dt_quota_slv_features);
428         } else {
429                 /* allocate fid dynamically if index does not exist already */
430                 qti->qti_fid.f_oid = LQUOTA_GENERATED_OID;
431
432                 /* lookup/create slave index file */
433                 slv_idx = lquota_disk_find_create(env, dev, parent,
434                                                   &qti->qti_fid,
435                                                   &dt_quota_slv_features,
436                                                   qti->qti_buf);
437         }
438
439         if (IS_ERR(slv_idx))
440                 RETURN(slv_idx);
441
442         /* install index operation vector */
443         if (slv_idx->do_index_ops == NULL) {
444                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
445                                                    &dt_quota_slv_features);
446                 if (rc) {
447                         CERROR("%s: failed to setup index operations for "DFID
448                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
449                                PFID(lu_object_fid(&slv_idx->do_lu)), rc);
450                         dt_object_put(env, slv_idx);
451                         slv_idx = ERR_PTR(rc);
452                 }
453         }
454
455         RETURN(slv_idx);
456 }
457
458 /*
459  * Iterate over all slave index files associated with global index \glb_fid and
460  * invoke a callback function for each slave index file.
461  *
462  * \param env     - is the environment passed by the caller
463  * \param parent  - is the parent directory where the slave index files are
464  *                  stored
465  * \param glb_fid - is the fid of the global index file associated with the
466  *                  slave indexes to scan
467  * \param func    - is the callback function to call each time a slave index
468  *                  file is found
469  * \param arg     - is an opaq argument passed to the callback function \func
470  */
471 int lquota_disk_for_each_slv(const struct lu_env *env, struct dt_object *parent,
472                              struct lu_fid *glb_fid, lquota_disk_slv_cb_t func,
473                              void *arg)
474 {
475         struct lquota_thread_info       *qti = lquota_info(env);
476         struct dt_it                    *it;
477         const struct dt_it_ops          *iops;
478         char                            *name;
479         int                              rc;
480         ENTRY;
481
482         OBD_ALLOC(name, LQUOTA_NAME_MAX);
483         if (name == NULL)
484                 RETURN(-ENOMEM);
485
486         /* filename associated with slave index files are prefixed with the most
487          * signicant bits of the global FID */
488         sprintf(name, "0x%x-", glb_fid->f_oid);
489
490         iops = &parent->do_index_ops->dio_it;
491         it = iops->init(env, parent, 0);
492         if (IS_ERR(it)) {
493                 OBD_FREE(name, LQUOTA_NAME_MAX);
494                 RETURN(PTR_ERR(it));
495         }
496
497         rc = iops->load(env, it, 0);
498         if (rc == 0) {
499                 /*
500                  * Iterator didn't find record with exactly the key requested.
501                  *
502                  * It is currently either
503                  *
504                  *     - positioned above record with key less than
505                  *     requested---skip it.
506                  *
507                  *     - or not positioned at all (is in IAM_IT_SKEWED
508                  *     state)---position it on the next item.
509                  */
510                 rc = iops->next(env, it);
511         } else if (rc > 0)
512                 rc = 0;
513
514         while (rc == 0) {
515                 struct dt_key   *key;
516                 int              len;
517
518                 len = iops->key_size(env, it);
519                 /* IAM iterator can return record with zero len. */
520                 if (len == 0 || len <= strlen(name) || len >= LQUOTA_NAME_MAX)
521                         goto next;
522
523                 key = iops->key(env, it);
524                 if (IS_ERR(key)) {
525                         rc = PTR_ERR(key);
526                         break;
527                 }
528
529                 if (strncmp((char *)key, name, strlen(name)) != 0)
530                         goto next;
531
532                 /* ldiskfs OSD returns filename as stored in directory entry
533                  * which does not end up with '\0' */
534                 memcpy(&qti->qti_buf, key, len);
535                 qti->qti_buf[len] = '\0';
536
537                 /* lookup fid associated with this slave index file */
538                 rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
539                 if (rc)
540                         break;
541
542                 if (qti->qti_fid.f_seq != FID_SEQ_QUOTA)
543                         goto next;
544
545                 rc = func(env, glb_fid, (char *)key, &qti->qti_fid, arg);
546                 if (rc)
547                         break;
548 next:
549                 do {
550                         rc = iops->next(env, it);
551                 } while (rc == -ESTALE);
552         }
553
554         iops->put(env, it);
555         iops->fini(env, it);
556         OBD_FREE(name, LQUOTA_NAME_MAX);
557         if (rc > 0)
558                 rc = 0;
559         RETURN(rc);
560 }
561
562 /*
563  * Retrieve quota settings from disk for a particular identifier.
564  *
565  * \param env - is the environment passed by the caller
566  * \param obj - is the on-disk index where quota settings are stored.
567  * \param id  - is the key to be updated
568  * \param rec - is the output record where to store quota settings.
569  *
570  * \retval    - 0 on success, appropriate error on failure
571  */
572 int lquota_disk_read(const struct lu_env *env, struct dt_object *obj,
573                      union lquota_id *id, struct dt_rec *rec)
574 {
575         int     rc;
576         ENTRY;
577
578         LASSERT(dt_object_exists(obj));
579         LASSERT(obj->do_index_ops != NULL);
580
581         /* lookup on-disk record from index file */
582         dt_read_lock(env, obj, 0);
583         rc = dt_lookup(env, obj, rec, (struct dt_key *)&id->qid_uid);
584         dt_read_unlock(env, obj);
585
586         RETURN(rc);
587 }
588
589 /*
590  * Reserve enough credits to update a record in a quota index file.
591  *
592  * \param env - is the environment passed by the caller
593  * \param th  - is the transaction to use for disk writes
594  * \param obj - is the on-disk index where quota settings are stored.
595  * \param id  - is the key to be updated
596  *
597  * \retval    - 0 on success, appropriate error on failure
598  */
599 int lquota_disk_declare_write(const struct lu_env *env, struct thandle *th,
600                               struct dt_object *obj, union lquota_id *id)
601 {
602         struct lquota_thread_info       *qti = lquota_info(env);
603         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
604         int                              rc;
605         ENTRY;
606
607         LASSERT(dt_object_exists(obj));
608         LASSERT(obj->do_index_ops != NULL);
609
610         /* speculative delete declaration in case there is already an existing
611          * record in the index */
612         rc = dt_declare_delete(env, obj, key, th);
613         if (rc)
614                 RETURN(rc);
615
616         /* declare insertion of updated record */
617         rc = dt_declare_insert(env, obj, (struct dt_rec *)&qti->qti_rec, key,
618                                th);
619         if (rc)
620                 RETURN(rc);
621
622         /* we might have to update the version of the global index too */
623         rc = dt_declare_version_set(env, obj, th);
624
625         RETURN(rc);
626 }
627
628 /*
629  * Update a record in a quota index file.
630  *
631  * \param env - is the environment passed by the caller
632  * \param th  - is the transaction to use for disk writes
633  * \param obj - is the on-disk index to be updated.
634  * \param id  - is the key to be updated
635  * \param rec - is the input record containing the new quota settings.
636  * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
637  * \param ver   - is the new version of the index if LQUOTA_SET_VER is set or is
638  *                used to return the new version of the index when
639  *                LQUOTA_BUMP_VER is set.
640  *
641  * \retval    - 0 on success, appropriate error on failure
642  */
643 int lquota_disk_write(const struct lu_env *env, struct thandle *th,
644                       struct dt_object *obj, union lquota_id *id,
645                       struct dt_rec *rec, __u32 flags, __u64 *ver)
646 {
647         struct lquota_thread_info       *qti = lquota_info(env);
648         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
649         int                              rc;
650         ENTRY;
651
652         LASSERT(dt_object_exists(obj));
653         LASSERT(obj->do_index_ops != NULL);
654
655         /* lock index */
656         dt_write_lock(env, obj, 0);
657
658         /* check whether there is already an existing record for this ID */
659         rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_rec, key);
660         if (rc == 0) {
661                 /* delete existing record in order to replace it */
662                 rc = dt_delete(env, obj, key, th);
663                 if (rc)
664                         GOTO(out, rc);
665         } else if (rc == -ENOENT) {
666                 /* probably first insert */
667                 rc = 0;
668         } else {
669                 GOTO(out, rc);
670         }
671
672         if (rec != NULL) {
673                 /* insert record with updated quota settings */
674                 rc = dt_insert(env, obj, rec, key, th);
675                 if (rc) {
676                         /* try to insert the old one */
677                         rc = dt_insert(env, obj, (struct dt_rec *)&qti->qti_rec,
678                                        key, th);
679                         LASSERTF(rc == 0, "failed to insert record in quota "
680                                  "index "DFID"\n",
681                                  PFID(lu_object_fid(&obj->do_lu)));
682                         GOTO(out, rc);
683                 }
684         }
685
686         if (flags != 0) {
687                 LASSERT(ver);
688                 if (flags & LQUOTA_BUMP_VER) {
689                         /* caller wants to bump the version, let's first read
690                          * it */
691                         *ver = dt_version_get(env, obj);
692                         (*ver)++;
693                 } else {
694                         LASSERT(flags & LQUOTA_SET_VER);
695                 }
696                 dt_version_set(env, obj, *ver, th);
697         }
698
699         EXIT;
700 out:
701         dt_write_unlock(env, obj);
702         return rc;
703 }
704
705 /*
706  * Update version of an index file
707  *
708  * \param env - is the environment passed by the caller
709  * \param dev - is the backend dt device storing the index file
710  * \param obj - is the on-disk index that should be updated
711  * \param ver - is the new version
712  */
713 int lquota_disk_update_ver(const struct lu_env *env, struct dt_device *dev,
714                            struct dt_object *obj, __u64 ver)
715 {
716         struct thandle  *th;
717         int              rc;
718         ENTRY;
719
720         th = dt_trans_create(env, dev);
721         if (IS_ERR(th))
722                 RETURN(PTR_ERR(th));
723
724         rc = dt_declare_version_set(env, obj, th);
725         if (rc)
726                 GOTO(out, rc);
727
728         rc = dt_trans_start_local(env, dev, th);
729         if (rc)
730                 GOTO(out, rc);
731         th->th_sync = 1;
732
733         dt_version_set(env, obj, ver, th);
734         EXIT;
735 out:
736         dt_trans_stop(env, dev, th);
737         return rc;
738 }
739
740 /*
741  * Write a global record
742  *
743  * \param env - is the environment passed by the caller
744  * \param obj - is the on-disk global index to be updated
745  * \param id  - index to be updated
746  * \param rec - record to be written
747  */
748 int lquota_disk_write_glb(const struct lu_env *env, struct dt_object *obj,
749                           __u64 id, struct lquota_glb_rec *rec)
750 {
751         struct dt_device        *dev = lu2dt_dev(obj->do_lu.lo_dev);
752         struct thandle          *th;
753         struct dt_key           *key = (struct dt_key *)&id;
754         int                      rc;
755         ENTRY;
756
757         th = dt_trans_create(env, dev);
758         if (IS_ERR(th))
759                 RETURN(PTR_ERR(th));
760
761         /* the entry with 0 key can always be found in IAM file. */
762         if (id == 0) {
763                 rc = dt_declare_delete(env, obj, key, th);
764                 if (rc)
765                         GOTO(out, rc);
766         }
767
768         rc = dt_declare_insert(env, obj, (struct dt_rec *)rec, key, th);
769         if (rc)
770                 GOTO(out, rc);
771
772         rc = dt_trans_start_local(env, dev, th);
773         if (rc)
774                 GOTO(out, rc);
775
776         dt_write_lock(env, obj, 0);
777
778         if (id == 0) {
779                 struct lquota_glb_rec *tmp;
780
781                 OBD_ALLOC_PTR(tmp);
782                 if (tmp == NULL)
783                         GOTO(out_lock, rc = -ENOMEM);
784
785                 rc = dt_lookup(env, obj, (struct dt_rec *)tmp, key);
786
787                 OBD_FREE_PTR(tmp);
788                 if (rc == 0) {
789                         rc = dt_delete(env, obj, key, th);
790                         if (rc)
791                                 GOTO(out_lock, rc);
792                 }
793                 rc = 0;
794         }
795
796         rc = dt_insert(env, obj, (struct dt_rec *)rec, key, th);
797 out_lock:
798         dt_write_unlock(env, obj);
799 out:
800         dt_trans_stop(env, dev, th);
801         RETURN(rc);
802 }