Whamcloud - gitweb
LU-4871 newline: Correct missing newline
[fs/lustre-release.git] / lustre / quota / lquota_disk.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2013, Intel Corporation.
25  * Use is subject to license terms.
26  *
27  * Author: Johann Lombardi <johann.lombardi@intel.com>
28  * Author: Niu    Yawei    <yawei.niu@intel.com>
29  */
30
31 /*
32  * The disk API is used by both the QMT and QSD to access/update on-disk index
33  * files. The API consists of the following functions:
34  *
35  * - lquota_disk_dir_find_create: look-up quota directory, create it if not
36  *                                found.
37  * - lquota_disk_glb_find_create: look-up global index file, create it if not
38  *                                found.
39  * - lquota_disk_slv_find:        look-up a slave index file.
40  * - lquota_disk_slv_find_create: look-up a slave index file. Allocate a FID if
41  *                                required and create the index file on disk if
42  *                                it does not exist.
43  * - lquota_disk_for_each_slv:    iterate over all existing slave index files
44  * - lquota_disk_read:            read quota settings from a index file
45  * - lquota_disk_declare_write:   reserve credits to update a record in an index
46  *                                file
47  * - lquota_disk_write:           update a record in an index file
48  * - lquota_disk_update_ver:      update version of an index file
49  */
50
51 #define DEBUG_SUBSYSTEM S_LQUOTA
52
53 #include "lquota_internal.h"
54
55 #define LQUOTA_MODE (S_IFREG | S_IRUGO | S_IWUSR)
56
57 /*
58  * Helper function looking up & creating if not found an index file with a
59  * dynamic fid.
60  */
61 static struct dt_object *
62 lquota_disk_find_create(const struct lu_env *env, struct dt_device *dev,
63                         struct dt_object *parent, struct lu_fid *fid,
64                         const struct dt_index_features *idx_feat,
65                         char *name)
66 {
67         struct lquota_thread_info       *qti = lquota_info(env);
68         struct dt_object                *obj;
69         struct local_oid_storage        *los;
70         int                              rc;
71         ENTRY;
72
73         /* Set up local storage */
74         rc = local_oid_storage_init(env, dev, fid, &los);
75         if (rc)
76                 RETURN(ERR_PTR(rc));
77
78         /* lookup/create slave index file */
79         obj = local_index_find_or_create(env, los, parent, name, LQUOTA_MODE,
80                                          idx_feat);
81         if (IS_ERR(obj))
82                 GOTO(out, obj);
83
84         /* local_oid_storage_fini() will finalize the local storage device,
85          * we have to open the object in another device stack */
86         qti->qti_fid = obj->do_lu.lo_header->loh_fid;
87         lu_object_put_nocache(env, &obj->do_lu);
88         obj = dt_locate(env, dev, &qti->qti_fid);
89         if (IS_ERR(obj))
90                 GOTO(out, obj);
91 out:
92         local_oid_storage_fini(env, los);
93         RETURN(obj);
94 }
95
96 /*
97  * helper function to generate the filename associated with a slave index file
98  */
99 static inline int lquota_disk_slv_filename(const struct lu_fid *glb_fid,
100                                            struct obd_uuid *uuid,
101                                            char *filename)
102 {
103         char    *name, *uuid_str;
104
105         /* In most case, the uuid is NULL terminated */
106         if (uuid->uuid[sizeof(*uuid) - 1] != '\0') {
107                 OBD_ALLOC(uuid_str, sizeof(*uuid));
108                 if (uuid_str == NULL)
109                         RETURN(-ENOMEM);
110                 memcpy(uuid_str, uuid->uuid, sizeof(*uuid) - 1);
111         } else {
112                 uuid_str = (char *)uuid->uuid;
113         }
114
115         /* we strip the slave's UUID (in the form of fsname-OST0001_UUID) of
116          * the filesystem name in case this one is changed in the future */
117         name = strrchr(uuid_str, '-');
118         if (name == NULL) {
119                 name = strrchr(uuid_str, ':');
120                 if (name == NULL) {
121                         CERROR("Failed to extract extract filesystem "
122                                "name from UUID %s\n", uuid_str);
123                         if (uuid_str != uuid->uuid)
124                                 OBD_FREE(uuid_str, sizeof(*uuid));
125                         return -EINVAL;
126                 }
127         }
128         name++;
129
130         /* the filename is composed of the most signicant bits of the global
131          * FID, that's to say the oid which encodes the pool id, pool type and
132          * quota type, followed by the export UUID */
133         sprintf(filename, "0x%x-%s", glb_fid->f_oid, name);
134
135         if (uuid_str != uuid->uuid)
136                 OBD_FREE(uuid_str, sizeof(*uuid));
137
138         return 0;
139 }
140
141 /*
142  * Set up quota directory (either "quota_master" or "quota_slave") for a QMT or
143  * QSD instance. This function is also used to create per-pool directory on
144  * the quota master.
145  * The directory is created with a local sequence if it does not exist already.
146  * This function is called at ->ldo_prepare time when the full device stack is
147  * configured.
148  *
149  * \param env  - is the environment passed by the caller
150  * \param dev  - is the dt_device where to create the quota directory
151  * \param parent  - is the parent directory. If not specified, the directory
152  *                  will be created under the root directory
153  * \param name - is the name of quota directory to be created
154  *
155  * \retval     - pointer to quota root dt_object on success, appropriate error
156  *               on failure
157  */
158 struct dt_object *lquota_disk_dir_find_create(const struct lu_env *env,
159                                               struct dt_device *dev,
160                                               struct dt_object *parent,
161                                               const char *name)
162 {
163         struct lquota_thread_info       *qti = lquota_info(env);
164         struct dt_object                *qt_dir = NULL;
165         struct local_oid_storage        *los = NULL;
166         int                              rc;
167         ENTRY;
168
169         /* Set up local storage to create the quota directory.
170          * We use the sequence reserved for local named objects */
171         lu_local_name_obj_fid(&qti->qti_fid, 1);
172         rc = local_oid_storage_init(env, dev, &qti->qti_fid, &los);
173         if (rc)
174                 RETURN(ERR_PTR(rc));
175
176         if (parent == NULL) {
177                 /* Fetch dt object associated with root directory */
178                 rc = dt_root_get(env, dev, &qti->qti_fid);
179                 if (rc)
180                         GOTO(out, rc);
181
182                 parent = dt_locate_at(env, dev, &qti->qti_fid,
183                                       dev->dd_lu_dev.ld_site->ls_top_dev, NULL);
184                 if (IS_ERR(parent))
185                         GOTO(out, rc = PTR_ERR(parent));
186         } else {
187                 lu_object_get(&parent->do_lu);
188         }
189
190         /* create quota directory to be used for all quota index files */
191         qt_dir = local_file_find_or_create(env, los, parent, name, S_IFDIR |
192                                            S_IRUGO | S_IWUSR | S_IXUGO);
193         if (IS_ERR(qt_dir))
194                 GOTO(out, rc = PTR_ERR(qt_dir));
195
196         /* local_oid_storage_fini() will finalize the local storage device,
197          * we have to open the object in another device stack */
198         qti->qti_fid = qt_dir->do_lu.lo_header->loh_fid;
199         lu_object_put_nocache(env, &qt_dir->do_lu);
200         qt_dir = dt_locate(env, dev, &qti->qti_fid);
201         if (IS_ERR(qt_dir))
202                 GOTO(out, rc = PTR_ERR(qt_dir));
203
204         if (!dt_try_as_dir(env, qt_dir))
205                 GOTO(out, rc = -ENOTDIR);
206         EXIT;
207 out:
208         if (parent != NULL && !IS_ERR(parent))
209                 lu_object_put(env, &parent->do_lu);
210         if (los != NULL)
211                 local_oid_storage_fini(env, los);
212         if (rc) {
213                 if (qt_dir != NULL && !IS_ERR(qt_dir))
214                         lu_object_put(env, &qt_dir->do_lu);
215                 qt_dir = ERR_PTR(rc);
216         }
217         return qt_dir;
218 }
219
220 /*
221  * Look-up/create a global index file.
222  *
223  * \param env - is the environment passed by the caller
224  * \parap dev - is the dt_device where to lookup/create the global index file
225  * \param parent - is the parent directory where to create the global index if
226  *                 not found
227  * \param fid - is the fid of the global index to be looked up/created
228  * \parap local - indicates whether the index should be created with a local
229  *                generated fid or with \fid
230  *
231  * \retval     - pointer to the dt_object of the global index on success,
232  *               appropriate error on failure
233  */
234 struct dt_object *lquota_disk_glb_find_create(const struct lu_env *env,
235                                               struct dt_device *dev,
236                                               struct dt_object *parent,
237                                               struct lu_fid *fid, bool local)
238 {
239         struct lquota_thread_info       *qti = lquota_info(env);
240         struct dt_object                *glb_idx;
241         const struct dt_index_features  *idx_feat;
242         ENTRY;
243
244         CDEBUG(D_QUOTA, "look-up/create %sglobal idx file ("DFID")\n",
245                local ? "local " : "", PFID(fid));
246
247 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0)
248         /* we use different index feature for each quota type and target type
249          * for the time being. This is done for on-disk conversion from the old
250          * quota format. Once this is no longer required, we should just be
251          * using dt_quota_glb_features for all global index file */
252         idx_feat = glb_idx_feature(fid);
253 #else
254 #warning "remove old quota compatibility code"
255         idx_feat = &dt_quota_glb_features;
256 #endif
257
258         /* the filename is composed of the most signicant bits of the FID,
259          * that's to say the oid which encodes the pool id, pool type and quota
260          * type */
261         sprintf(qti->qti_buf, "0x%x", fid->f_oid);
262
263         if (local) {
264                 /* We use the sequence reserved for local named objects */
265                 lu_local_name_obj_fid(&qti->qti_fid, 1);
266                 glb_idx = lquota_disk_find_create(env, dev, parent,
267                                                   &qti->qti_fid, idx_feat,
268                                                   qti->qti_buf);
269         } else {
270                 /* look-up/create global index on disk */
271                 glb_idx = local_index_find_or_create_with_fid(env, dev, fid,
272                                                               parent,
273                                                               qti->qti_buf,
274                                                               LQUOTA_MODE,
275                                                               idx_feat);
276         }
277
278         if (IS_ERR(glb_idx)) {
279                 CERROR("%s: failed to look-up/create idx file "DFID" rc:%ld "
280                        "local:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
281                        PFID(fid), PTR_ERR(glb_idx), local);
282                 RETURN(glb_idx);
283         }
284
285         /* install index operation vector */
286         if (glb_idx->do_index_ops == NULL) {
287                 int rc;
288
289                 rc = glb_idx->do_ops->do_index_try(env, glb_idx, idx_feat);
290                 if (rc) {
291                         CERROR("%s: failed to setup index operations for "DFID
292                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
293                                PFID(lu_object_fid(&glb_idx->do_lu)), rc);
294                         lu_object_put(env, &glb_idx->do_lu);
295                         glb_idx = ERR_PTR(rc);
296                 }
297         }
298
299         RETURN(glb_idx);
300 }
301
302 /*
303  * Look-up a slave index file.
304  *
305  * \param env - is the environment passed by the caller
306  * \param dev - is the backend dt_device where to look-up/create the slave index
307  * \param parent - is the parent directory where to lookup the slave index
308  * \param glb_fid - is the fid of the global index file associated with this
309  *                  slave index.
310  * \param uuid    - is the uuid of slave which is (re)connecting to the master
311  *                  target
312  *
313  * \retval     - pointer to the dt_object of the slave index on success,
314  *               appropriate error on failure
315  */
316 struct dt_object *lquota_disk_slv_find(const struct lu_env *env,
317                                        struct dt_device *dev,
318                                        struct dt_object *parent,
319                                        const struct lu_fid *glb_fid,
320                                        struct obd_uuid *uuid)
321 {
322         struct lquota_thread_info       *qti = lquota_info(env);
323         struct dt_object                *slv_idx;
324         int                              rc;
325         ENTRY;
326
327         LASSERT(uuid != NULL);
328
329         CDEBUG(D_QUOTA, "lookup slave index file for %s\n",
330                obd_uuid2str(uuid));
331
332         /* generate filename associated with the slave */
333         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
334         if (rc)
335                 RETURN(ERR_PTR(rc));
336
337         /* lookup slave index file */
338         rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
339         if (rc)
340                 RETURN(ERR_PTR(rc));
341
342         /* name is found, get the object */
343         slv_idx = dt_locate(env, dev, &qti->qti_fid);
344         if (IS_ERR(slv_idx))
345                 RETURN(slv_idx);
346
347         if (slv_idx->do_index_ops == NULL) {
348                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
349                                                    &dt_quota_slv_features);
350                 if (rc) {
351                         CERROR("%s: failed to setup slave index operations for "
352                                "%s, rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
353                                obd_uuid2str(uuid), rc);
354                         lu_object_put(env, &slv_idx->do_lu);
355                         slv_idx = ERR_PTR(rc);
356                 }
357         }
358
359         RETURN(slv_idx);
360 }
361
362 /*
363  * Look-up a slave index file. If the slave index isn't found:
364  * - if local is set to false, we allocate a FID from FID_SEQ_QUOTA sequence and
365  *   create the index.
366  * - otherwise, we create the index file with a local reserved FID (see
367  *   lquota_local_oid)
368  *
369  * \param env - is the environment passed by the caller
370  * \param dev - is the backend dt_device where to look-up/create the slave index
371  * \param parent - is the parent directory where to create the slave index if
372  *                 it does not exist already
373  * \param glb_fid - is the fid of the global index file associated with this
374  *                  slave index.
375  * \param uuid    - is the uuid of slave which is (re)connecting to the master
376  *                  target
377  * \param local   - indicate whether to use local reserved FID (LQUOTA_USR_OID
378  *                  & LQUOTA_GRP_OID) for the slave index creation or to
379  *                  allocate a new fid from sequence FID_SEQ_QUOTA
380  *
381  * \retval     - pointer to the dt_object of the slave index on success,
382  *               appropriate error on failure
383  */
384 struct dt_object *lquota_disk_slv_find_create(const struct lu_env *env,
385                                               struct dt_device *dev,
386                                               struct dt_object *parent,
387                                               struct lu_fid *glb_fid,
388                                               struct obd_uuid *uuid,
389                                               bool local)
390 {
391         struct lquota_thread_info       *qti = lquota_info(env);
392         struct dt_object                *slv_idx;
393         int                              rc;
394         ENTRY;
395
396         LASSERT(uuid != NULL);
397
398         CDEBUG(D_QUOTA, "lookup/create slave index file for %s\n",
399                obd_uuid2str(uuid));
400
401         /* generate filename associated with the slave */
402         rc = lquota_disk_slv_filename(glb_fid, uuid, qti->qti_buf);
403         if (rc)
404                 RETURN(ERR_PTR(rc));
405
406         /* Slave indexes uses the FID_SEQ_QUOTA sequence since they can be read
407          * through the network */
408         qti->qti_fid.f_seq = FID_SEQ_QUOTA;
409         qti->qti_fid.f_ver = 0;
410         if (local) {
411                 int type;
412
413                 rc = lquota_extract_fid(glb_fid, NULL, NULL, &type);
414                 if (rc)
415                         RETURN(ERR_PTR(rc));
416
417                 /* use predefined fid in the reserved oid list */
418                 qti->qti_fid.f_oid = (type == USRQUOTA) ? LQUOTA_USR_OID
419                                                         : LQUOTA_GRP_OID;
420
421                 slv_idx = local_index_find_or_create_with_fid(env, dev,
422                                                               &qti->qti_fid,
423                                                               parent,
424                                                               qti->qti_buf,
425                                                               LQUOTA_MODE,
426                                                         &dt_quota_slv_features);
427         } else {
428                 /* allocate fid dynamically if index does not exist already */
429                 qti->qti_fid.f_oid = LQUOTA_GENERATED_OID;
430
431                 /* lookup/create slave index file */
432                 slv_idx = lquota_disk_find_create(env, dev, parent,
433                                                   &qti->qti_fid,
434                                                   &dt_quota_slv_features,
435                                                   qti->qti_buf);
436         }
437
438         if (IS_ERR(slv_idx))
439                 RETURN(slv_idx);
440
441         /* install index operation vector */
442         if (slv_idx->do_index_ops == NULL) {
443                 rc = slv_idx->do_ops->do_index_try(env, slv_idx,
444                                                    &dt_quota_slv_features);
445                 if (rc) {
446                         CERROR("%s: failed to setup index operations for "DFID
447                                " rc:%d\n", dev->dd_lu_dev.ld_obd->obd_name,
448                                PFID(lu_object_fid(&slv_idx->do_lu)), rc);
449                         lu_object_put(env, &slv_idx->do_lu);
450                         slv_idx = ERR_PTR(rc);
451                 }
452         }
453
454         RETURN(slv_idx);
455 }
456
457 /*
458  * Iterate over all slave index files associated with global index \glb_fid and
459  * invoke a callback function for each slave index file.
460  *
461  * \param env     - is the environment passed by the caller
462  * \param parent  - is the parent directory where the slave index files are
463  *                  stored
464  * \param glb_fid - is the fid of the global index file associated with the
465  *                  slave indexes to scan
466  * \param func    - is the callback function to call each time a slave index
467  *                  file is found
468  * \param arg     - is an opaq argument passed to the callback function \func
469  */
470 int lquota_disk_for_each_slv(const struct lu_env *env, struct dt_object *parent,
471                              struct lu_fid *glb_fid, lquota_disk_slv_cb_t func,
472                              void *arg)
473 {
474         struct lquota_thread_info       *qti = lquota_info(env);
475         struct dt_it                    *it;
476         const struct dt_it_ops          *iops;
477         char                            *name;
478         int                              rc;
479         ENTRY;
480
481         OBD_ALLOC(name, LQUOTA_NAME_MAX);
482         if (name == NULL)
483                 RETURN(-ENOMEM);
484
485         /* filename associated with slave index files are prefixed with the most
486          * signicant bits of the global FID */
487         sprintf(name, "0x%x-", glb_fid->f_oid);
488
489         iops = &parent->do_index_ops->dio_it;
490         it = iops->init(env, parent, 0, BYPASS_CAPA);
491         if (IS_ERR(it)) {
492                 OBD_FREE(name, LQUOTA_NAME_MAX);
493                 RETURN(PTR_ERR(it));
494         }
495
496         rc = iops->load(env, it, 0);
497         if (rc == 0) {
498                 /*
499                  * Iterator didn't find record with exactly the key requested.
500                  *
501                  * It is currently either
502                  *
503                  *     - positioned above record with key less than
504                  *     requested---skip it.
505                  *
506                  *     - or not positioned at all (is in IAM_IT_SKEWED
507                  *     state)---position it on the next item.
508                  */
509                 rc = iops->next(env, it);
510         } else if (rc > 0)
511                 rc = 0;
512
513         while (rc == 0) {
514                 struct dt_key   *key;
515                 int              len;
516
517                 len = iops->key_size(env, it);
518                 /* IAM iterator can return record with zero len. */
519                 if (len == 0 || len <= strlen(name) || len >= LQUOTA_NAME_MAX)
520                         goto next;
521
522                 key = iops->key(env, it);
523                 if (IS_ERR(key)) {
524                         rc = PTR_ERR(key);
525                         break;
526                 }
527
528                 if (strncmp((char *)key, name, strlen(name)) != 0)
529                         goto next;
530
531                 /* ldiskfs OSD returns filename as stored in directory entry
532                  * which does not end up with '\0' */
533                 memcpy(&qti->qti_buf, key, len);
534                 qti->qti_buf[len] = '\0';
535
536                 /* lookup fid associated with this slave index file */
537                 rc = dt_lookup_dir(env, parent, qti->qti_buf, &qti->qti_fid);
538                 if (rc)
539                         break;
540
541                 if (qti->qti_fid.f_seq != FID_SEQ_QUOTA)
542                         goto next;
543
544                 rc = func(env, glb_fid, (char *)key, &qti->qti_fid, arg);
545                 if (rc)
546                         break;
547 next:
548                 do {
549                         rc = iops->next(env, it);
550                 } while (rc == -ESTALE);
551         }
552
553         iops->put(env, it);
554         iops->fini(env, it);
555         OBD_FREE(name, LQUOTA_NAME_MAX);
556         if (rc > 0)
557                 rc = 0;
558         RETURN(rc);
559 }
560
561 /*
562  * Retrieve quota settings from disk for a particular identifier.
563  *
564  * \param env - is the environment passed by the caller
565  * \param obj - is the on-disk index where quota settings are stored.
566  * \param id  - is the key to be updated
567  * \param rec - is the output record where to store quota settings.
568  *
569  * \retval    - 0 on success, appropriate error on failure
570  */
571 int lquota_disk_read(const struct lu_env *env, struct dt_object *obj,
572                      union lquota_id *id, struct dt_rec *rec)
573 {
574         int     rc;
575         ENTRY;
576
577         LASSERT(dt_object_exists(obj));
578         LASSERT(obj->do_index_ops != NULL);
579
580         /* lookup on-disk record from index file */
581         dt_read_lock(env, obj, 0);
582         rc = dt_lookup(env, obj, rec, (struct dt_key *)&id->qid_uid,
583                        BYPASS_CAPA);
584         dt_read_unlock(env, obj);
585
586         RETURN(rc);
587 }
588
589 /*
590  * Reserve enough credits to update a record in a quota index file.
591  *
592  * \param env - is the environment passed by the caller
593  * \param th  - is the transaction to use for disk writes
594  * \param obj - is the on-disk index where quota settings are stored.
595  * \param id  - is the key to be updated
596  *
597  * \retval    - 0 on success, appropriate error on failure
598  */
599 int lquota_disk_declare_write(const struct lu_env *env, struct thandle *th,
600                               struct dt_object *obj, union lquota_id *id)
601 {
602         struct lquota_thread_info       *qti = lquota_info(env);
603         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
604         int                              rc;
605         ENTRY;
606
607         LASSERT(dt_object_exists(obj));
608         LASSERT(obj->do_index_ops != NULL);
609
610         /* speculative delete declaration in case there is already an existing
611          * record in the index */
612         rc = dt_declare_delete(env, obj, key, th);
613         if (rc)
614                 RETURN(rc);
615
616         /* declare insertion of updated record */
617         rc = dt_declare_insert(env, obj, (struct dt_rec *)&qti->qti_rec, key,
618                                th);
619         if (rc)
620                 RETURN(rc);
621
622         /* we might have to update the version of the global index too */
623         rc = dt_declare_version_set(env, obj, th);
624
625         RETURN(rc);
626 }
627
628 /*
629  * Update a record in a quota index file.
630  *
631  * \param env - is the environment passed by the caller
632  * \param th  - is the transaction to use for disk writes
633  * \param obj - is the on-disk index to be updated.
634  * \param id  - is the key to be updated
635  * \param rec - is the input record containing the new quota settings.
636  * \param flags - can be LQUOTA_BUMP_VER or LQUOTA_SET_VER.
637  * \param ver   - is the new version of the index if LQUOTA_SET_VER is set or is
638  *                used to return the new version of the index when
639  *                LQUOTA_BUMP_VER is set.
640  *
641  * \retval    - 0 on success, appropriate error on failure
642  */
643 int lquota_disk_write(const struct lu_env *env, struct thandle *th,
644                       struct dt_object *obj, union lquota_id *id,
645                       struct dt_rec *rec, __u32 flags, __u64 *ver)
646 {
647         struct lquota_thread_info       *qti = lquota_info(env);
648         struct dt_key                   *key = (struct dt_key *)&id->qid_uid;
649         int                              rc;
650         ENTRY;
651
652         LASSERT(dt_object_exists(obj));
653         LASSERT(obj->do_index_ops != NULL);
654
655         /* lock index */
656         dt_write_lock(env, obj, 0);
657
658         /* check whether there is already an existing record for this ID */
659         rc = dt_lookup(env, obj, (struct dt_rec *)&qti->qti_rec, key,
660                        BYPASS_CAPA);
661         if (rc == 0) {
662                 /* delete existing record in order to replace it */
663                 rc = dt_delete(env, obj, key, th, BYPASS_CAPA);
664                 if (rc)
665                         GOTO(out, rc);
666         } else if (rc == -ENOENT) {
667                 /* probably first insert */
668                 rc = 0;
669         } else {
670                 GOTO(out, rc);
671         }
672
673         if (rec != NULL) {
674                 /* insert record with updated quota settings */
675                 rc = dt_insert(env, obj, rec, key, th, BYPASS_CAPA, 1);
676                 if (rc) {
677                         /* try to insert the old one */
678                         rc = dt_insert(env, obj, (struct dt_rec *)&qti->qti_rec,
679                                        key, th, BYPASS_CAPA, 1);
680                         LASSERTF(rc == 0, "failed to insert record in quota "
681                                  "index "DFID"\n",
682                                  PFID(lu_object_fid(&obj->do_lu)));
683                         GOTO(out, rc);
684                 }
685         }
686
687         if (flags != 0) {
688                 LASSERT(ver);
689                 if (flags & LQUOTA_BUMP_VER) {
690                         /* caller wants to bump the version, let's first read
691                          * it */
692                         *ver = dt_version_get(env, obj);
693                         (*ver)++;
694                 } else {
695                         LASSERT(flags & LQUOTA_SET_VER);
696                 }
697                 dt_version_set(env, obj, *ver, th);
698         }
699
700         EXIT;
701 out:
702         dt_write_unlock(env, obj);
703         return rc;
704 }
705
706 /*
707  * Update version of an index file
708  *
709  * \param env - is the environment passed by the caller
710  * \param dev - is the backend dt device storing the index file
711  * \param obj - is the on-disk index that should be updated
712  * \param ver - is the new version
713  */
714 int lquota_disk_update_ver(const struct lu_env *env, struct dt_device *dev,
715                            struct dt_object *obj, __u64 ver)
716 {
717         struct thandle  *th;
718         int              rc;
719         ENTRY;
720
721         th = dt_trans_create(env, dev);
722         if (IS_ERR(th))
723                 RETURN(PTR_ERR(th));
724
725         rc = dt_declare_version_set(env, obj, th);
726         if (rc)
727                 GOTO(out, rc);
728
729         rc = dt_trans_start_local(env, dev, th);
730         if (rc)
731                 GOTO(out, rc);
732         th->th_sync = 1;
733
734         dt_version_set(env, obj, ver, th);
735         EXIT;
736 out:
737         dt_trans_stop(env, dev, th);
738         return rc;
739 }
740
741 /*
742  * Write a global record
743  *
744  * \param env - is the environment passed by the caller
745  * \param obj - is the on-disk global index to be updated
746  * \param id  - index to be updated
747  * \param rec - record to be written
748  */
749 int lquota_disk_write_glb(const struct lu_env *env, struct dt_object *obj,
750                           __u64 id, struct lquota_glb_rec *rec)
751 {
752         struct dt_device        *dev = lu2dt_dev(obj->do_lu.lo_dev);
753         struct thandle          *th;
754         struct dt_key           *key = (struct dt_key *)&id;
755         int                      rc;
756         ENTRY;
757
758         th = dt_trans_create(env, dev);
759         if (IS_ERR(th))
760                 RETURN(PTR_ERR(th));
761
762         /* the entry with 0 key can always be found in IAM file. */
763         if (id == 0) {
764                 rc = dt_declare_delete(env, obj, key, th);
765                 if (rc)
766                         GOTO(out, rc);
767         }
768
769         rc = dt_declare_insert(env, obj, (struct dt_rec *)rec, key, th);
770         if (rc)
771                 GOTO(out, rc);
772
773         rc = dt_trans_start_local(env, dev, th);
774         if (rc)
775                 GOTO(out, rc);
776
777         dt_write_lock(env, obj, 0);
778
779         if (id == 0) {
780                 struct lquota_glb_rec *tmp;
781
782                 OBD_ALLOC_PTR(tmp);
783                 if (tmp == NULL)
784                         GOTO(out_lock, rc = -ENOMEM);
785
786                 rc = dt_lookup(env, obj, (struct dt_rec *)tmp, key,
787                                BYPASS_CAPA);
788
789                 OBD_FREE_PTR(tmp);
790                 if (rc == 0) {
791                         rc = dt_delete(env, obj, key, th, BYPASS_CAPA);
792                         if (rc)
793                                 GOTO(out_lock, rc);
794                 }
795                 rc = 0;
796         }
797
798         rc = dt_insert(env, obj, (struct dt_rec *)rec, key, th, BYPASS_CAPA, 1);
799 out_lock:
800         dt_write_unlock(env, obj);
801 out:
802         dt_trans_stop(env, dev, th);
803         RETURN(rc);
804 }
805 EXPORT_SYMBOL(lquota_disk_write_glb);