Whamcloud - gitweb
ef81a47c8b67e59188091b5ab5649d4163711bd4
[fs/lustre-release.git] / lustre / ptlrpc / nodemap_storage.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2015, Trustees of Indiana University
24  *
25  * Copyright (c) 2014, Intel Corporation.
26  *
27  * Author: Joshua Walgenbach <jjw@iu.edu>
28  * Author: Kit Westneat <cwestnea@iu.edu>
29  *
30  * Implements the storage functionality for the nodemap configuration. Functions
31  * in this file prepare, store, and load nodemap configuration data. Targets
32  * using nodemap services should register a configuration file object. Nodemap
33  * configuration changes that need to persist should call the appropriate
34  * storage function for the data being modified.
35  *
36  * There are several index types as defined in enum nodemap_idx_type:
37  *      NODEMAP_CLUSTER_IDX     stores the data found on the lu_nodemap struct,
38  *                              like root squash and config flags, as well as
39  *                              the name.
40  *      NODEMAP_RANGE_IDX       stores NID range information for a nodemap
41  *      NODEMAP_UIDMAP_IDX      stores a fs/client UID mapping pair
42  *      NODEMAP_GIDMAP_IDX      stores a fs/client GID mapping pair
43  *      NODEMAP_GLOBAL_IDX      stores whether or not nodemaps are active
44  */
45
46 #include <libcfs/libcfs.h>
47 #include <linux/err.h>
48 #include <linux/kernel.h>
49 #include <linux/list.h>
50 #include <linux/mutex.h>
51 #include <linux/string.h>
52 #include <linux/types.h>
53 #include <lnet/types.h>
54 #include <lustre/lustre_idl.h>
55 #include <dt_object.h>
56 #include <lu_object.h>
57 #include <lustre_net.h>
58 #include <lustre_nodemap.h>
59 #include <obd_class.h>
60 #include <obd_support.h>
61 #include "nodemap_internal.h"
62
63 /* list of registered nodemap index files */
64 static LIST_HEAD(ncf_list_head);
65 static DEFINE_MUTEX(ncf_list_lock);
66
67 /* lu_nodemap flags */
68 enum nm_flag_shifts {
69         NM_FL_ALLOW_ROOT_ACCESS = 0x1,
70         NM_FL_TRUST_CLIENT_IDS = 0x2,
71 };
72
73 static void nodemap_cluster_key_init(struct nodemap_key *nk, unsigned int nm_id)
74 {
75         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
76                                                         NODEMAP_CLUSTER_IDX));
77         nk->nk_unused = 0;
78 }
79
80 static void nodemap_cluster_rec_init(union nodemap_rec *nr,
81                                      const struct lu_nodemap *nodemap)
82 {
83         CLASSERT(sizeof(nr->ncr.ncr_name) == sizeof(nodemap->nm_name));
84
85         strncpy(nr->ncr.ncr_name, nodemap->nm_name, sizeof(nodemap->nm_name));
86         nr->ncr.ncr_squash_uid = cpu_to_le32(nodemap->nm_squash_uid);
87         nr->ncr.ncr_squash_gid = cpu_to_le32(nodemap->nm_squash_gid);
88         nr->ncr.ncr_flags = cpu_to_le32(
89                 (nodemap->nmf_trust_client_ids ? NM_FL_TRUST_CLIENT_IDS : 0) |
90                 (nodemap->nmf_allow_root_access ? NM_FL_ALLOW_ROOT_ACCESS : 0));
91 }
92
93 static void nodemap_idmap_key_init(struct nodemap_key *nk, unsigned int nm_id,
94                                    enum nodemap_id_type id_type,
95                                    u32 id_client)
96 {
97         enum nodemap_idx_type idx_type;
98
99         if (id_type == NODEMAP_UID)
100                 idx_type = NODEMAP_UIDMAP_IDX;
101         else
102                 idx_type = NODEMAP_GIDMAP_IDX;
103
104         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id, idx_type));
105         nk->nk_id_client = cpu_to_le32(id_client);
106 }
107
108 static void nodemap_idmap_rec_init(union nodemap_rec *nr, u32 id_fs)
109 {
110         nr->nir.nir_id_fs = cpu_to_le32(id_fs);
111 }
112
113 static void nodemap_range_key_init(struct nodemap_key *nk, unsigned int nm_id,
114                                    unsigned int rn_id)
115 {
116         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
117                                                         NODEMAP_RANGE_IDX));
118         nk->nk_range_id = cpu_to_le32(rn_id);
119 }
120
121 static void nodemap_range_rec_init(union nodemap_rec *nr,
122                                    const lnet_nid_t nid[2])
123 {
124         nr->nrr.nrr_start_nid = cpu_to_le64(nid[0]);
125         nr->nrr.nrr_end_nid = cpu_to_le64(nid[1]);
126 }
127
128 static void nodemap_global_key_init(struct nodemap_key *nk)
129 {
130         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(0, NODEMAP_GLOBAL_IDX));
131         nk->nk_unused = 0;
132 }
133
134 static void nodemap_global_rec_init(union nodemap_rec *nr, bool active)
135 {
136         nr->ngr.ngr_is_active = active;
137 }
138
139 /* should be called with dt_write lock */
140 static void nodemap_inc_version(const struct lu_env *env,
141                                 struct dt_object *nodemap_idx,
142                                 struct thandle *th)
143 {
144         u64 ver = dt_version_get(env, nodemap_idx);
145         dt_version_set(env, nodemap_idx, ver + 1, th);
146 }
147
148 static int nodemap_idx_insert(struct lu_env *env,
149                               struct dt_object *idx,
150                               const struct nodemap_key *nk,
151                               const union nodemap_rec *nr)
152 {
153         struct thandle          *th;
154         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
155         int                      rc;
156
157         CLASSERT(sizeof(union nodemap_rec) == 32);
158
159         th = dt_trans_create(env, dev);
160
161         if (IS_ERR(th))
162                 GOTO(out, rc = PTR_ERR(th));
163
164         rc = dt_declare_insert(env, idx,
165                                (const struct dt_rec *)nr,
166                                (const struct dt_key *)nk, th);
167         if (rc != 0)
168                 GOTO(out, rc);
169
170         rc = dt_declare_version_set(env, idx, th);
171         if (rc != 0)
172                 GOTO(out, rc);
173
174         rc = dt_trans_start_local(env, dev, th);
175         if (rc != 0)
176                 GOTO(out, rc);
177
178         dt_write_lock(env, idx, 0);
179
180         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
181                        (const struct dt_key *)nk, th, 1);
182
183         nodemap_inc_version(env, idx, th);
184         dt_write_unlock(env, idx);
185 out:
186         dt_trans_stop(env, dev, th);
187
188         return rc;
189 }
190
191 static int nodemap_idx_update(struct lu_env *env,
192                               struct dt_object *idx,
193                               const struct nodemap_key *nk,
194                               const union nodemap_rec *nr)
195 {
196         struct thandle          *th;
197         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
198         int                      rc = 0;
199
200         th = dt_trans_create(env, dev);
201
202         if (IS_ERR(th))
203                 GOTO(out, rc = PTR_ERR(th));
204
205         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
206         if (rc != 0)
207                 GOTO(out, rc);
208
209         rc = dt_declare_insert(env, idx, (const struct dt_rec *)nr,
210                                (const struct dt_key *)nk, th);
211         if (rc != 0)
212                 GOTO(out, rc);
213
214         rc = dt_declare_version_set(env, idx, th);
215         if (rc != 0)
216                 GOTO(out, rc);
217
218         rc = dt_trans_start_local(env, dev, th);
219         if (rc != 0)
220                 GOTO(out, rc);
221
222         dt_write_lock(env, idx, 0);
223
224         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
225         if (rc != 0)
226                 GOTO(out_lock, rc);
227
228         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
229                        (const struct dt_key *)nk, th, 1);
230         if (rc != 0)
231                 GOTO(out_lock, rc);
232
233         nodemap_inc_version(env, idx, th);
234 out_lock:
235         dt_write_unlock(env, idx);
236 out:
237         dt_trans_stop(env, dev, th);
238
239         return rc;
240 }
241
242 static int nodemap_idx_delete(struct lu_env *env,
243                               struct dt_object *idx,
244                               const struct nodemap_key *nk,
245                               const union nodemap_rec *unused)
246 {
247         struct thandle          *th;
248         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
249         int                      rc = 0;
250
251         th = dt_trans_create(env, dev);
252
253         if (IS_ERR(th))
254                 GOTO(out, rc = PTR_ERR(th));
255
256         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
257         if (rc != 0)
258                 GOTO(out, rc);
259
260         rc = dt_declare_version_set(env, idx, th);
261         if (rc != 0)
262                 GOTO(out, rc);
263
264         rc = dt_trans_start_local(env, dev, th);
265         if (rc != 0)
266                 GOTO(out, rc);
267
268         dt_write_lock(env, idx, 0);
269
270         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
271
272         nodemap_inc_version(env, idx, th);
273
274         dt_write_unlock(env, idx);
275 out:
276         dt_trans_stop(env, dev, th);
277
278         return rc;
279 }
280
281 typedef int (*nm_idx_cb_t)(struct lu_env *env,
282                            struct dt_object *idx,
283                            const struct nodemap_key *nk,
284                            const union nodemap_rec *nr);
285
286 /**
287  * Iterates through all the registered nodemap_config_files and calls the
288  * given callback with the ncf as a parameter, as well as the given key and rec.
289  *
290  * \param       cb_f            callback function to call
291  * \param       nk              key of the record to act upon
292  * \param       nr              record to act upon, NULL for the delete action
293  */
294 static int nodemap_idx_action(nm_idx_cb_t cb_f, struct nodemap_key *nk,
295                               union nodemap_rec *nr)
296 {
297         struct nm_config_file   *ncf;
298         struct lu_env            env;
299         int                      rc = 0;
300         int                      rc2 = 0;
301
302         rc = lu_env_init(&env, LCT_LOCAL);
303         if (rc != 0)
304                 return rc;
305
306         mutex_lock(&ncf_list_lock);
307         list_for_each_entry(ncf, &ncf_list_head, ncf_list) {
308                 rc2 = cb_f(&env, ncf->ncf_obj, nk, nr);
309                 if (rc2 < 0) {
310                         CWARN("%s: error writing to nodemap config: rc = %d\n",
311                               ncf->ncf_obj->do_lu.lo_dev->ld_obd->obd_name, rc);
312                         rc = rc2;
313                 }
314         }
315         mutex_unlock(&ncf_list_lock);
316         lu_env_fini(&env);
317
318         return 0;
319 }
320
321 enum nm_add_update {
322         NM_ADD = 0,
323         NM_UPDATE = 1,
324 };
325
326 static int nodemap_idx_nodemap_add_update(const struct lu_nodemap *nodemap,
327                                           enum nm_add_update update)
328 {
329         struct nodemap_key       nk;
330         union nodemap_rec        nr;
331         int rc = 0;
332
333         ENTRY;
334
335         nodemap_cluster_key_init(&nk, nodemap->nm_id);
336         nodemap_cluster_rec_init(&nr, nodemap);
337
338         if (update == NM_UPDATE)
339                 rc = nodemap_idx_action(nodemap_idx_update, &nk, &nr);
340         else
341                 rc = nodemap_idx_action(nodemap_idx_insert, &nk, &nr);
342
343         RETURN(rc);
344 }
345
346 int nodemap_idx_nodemap_add(const struct lu_nodemap *nodemap)
347 {
348         return nodemap_idx_nodemap_add_update(nodemap, NM_ADD);
349 }
350
351 int nodemap_idx_nodemap_update(const struct lu_nodemap *nodemap)
352 {
353         return nodemap_idx_nodemap_add_update(nodemap, NM_UPDATE);
354 }
355
356 int nodemap_idx_nodemap_del(const struct lu_nodemap *nodemap)
357 {
358         struct rb_root           root;
359         struct lu_idmap         *idmap;
360         struct lu_idmap         *temp;
361         struct lu_nid_range     *range;
362         struct lu_nid_range     *range_temp;
363         struct nodemap_key       nk;
364         int                      rc = 0;
365         int                      rc2 = 0;
366
367         ENTRY;
368
369         root = nodemap->nm_fs_to_client_uidmap;
370         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
371                                                 id_fs_to_client) {
372                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
373                                        idmap->id_client);
374                 rc2 = nodemap_idx_action(nodemap_idx_delete, &nk, NULL);
375                 if (rc2 < 0)
376                         rc = rc2;
377         }
378
379         root = nodemap->nm_client_to_fs_gidmap;
380         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
381                                                 id_client_to_fs) {
382                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
383                                        idmap->id_client);
384                 rc2 = nodemap_idx_action(nodemap_idx_delete, &nk, NULL);
385                 if (rc2 < 0)
386                         rc = rc2;
387         }
388
389         list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
390                                  rn_list) {
391                 nodemap_range_key_init(&nk, nodemap->nm_id, range->rn_id);
392                 rc2 = nodemap_idx_action(nodemap_idx_delete, &nk, NULL);
393                 if (rc2 < 0)
394                         rc = rc2;
395         }
396
397         nodemap_cluster_key_init(&nk, nodemap->nm_id);
398         rc2 = nodemap_idx_action(nodemap_idx_delete, &nk, NULL);
399         if (rc2 < 0)
400                 rc = rc2;
401
402         RETURN(rc);
403 }
404
405 int nodemap_idx_range_add(const struct lu_nid_range *range,
406                           const lnet_nid_t nid[2])
407 {
408         struct nodemap_key       nk;
409         union nodemap_rec        nr;
410         ENTRY;
411
412         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
413         nodemap_range_rec_init(&nr, nid);
414
415         RETURN(nodemap_idx_action(nodemap_idx_insert, &nk, &nr));
416 }
417
418 int nodemap_idx_range_del(const struct lu_nid_range *range)
419 {
420         struct nodemap_key       nk;
421         ENTRY;
422
423         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
424
425         RETURN(nodemap_idx_action(nodemap_idx_delete, &nk, NULL));
426 }
427
428 int nodemap_idx_idmap_add(const struct lu_nodemap *nodemap,
429                           enum nodemap_id_type id_type,
430                           const u32 map[2])
431 {
432         struct nodemap_key       nk;
433         union nodemap_rec        nr;
434         ENTRY;
435
436         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
437         nodemap_idmap_rec_init(&nr, map[1]);
438
439         RETURN(nodemap_idx_action(nodemap_idx_insert, &nk, &nr));
440 }
441
442 int nodemap_idx_idmap_del(const struct lu_nodemap *nodemap,
443                           enum nodemap_id_type id_type,
444                           const u32 map[2])
445 {
446         struct nodemap_key       nk;
447         ENTRY;
448
449         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
450
451         RETURN(nodemap_idx_action(nodemap_idx_delete, &nk, NULL));
452 }
453
454 static int nodemap_idx_global_add_update(bool value, enum nm_add_update update)
455 {
456         struct nodemap_key       nk;
457         union nodemap_rec        nr;
458         ENTRY;
459
460         nodemap_global_key_init(&nk);
461         nodemap_global_rec_init(&nr, value);
462
463         if (update == NM_UPDATE)
464                 RETURN(nodemap_idx_action(nodemap_idx_update, &nk, &nr));
465         else
466                 RETURN(nodemap_idx_action(nodemap_idx_insert, &nk, &nr));
467 }
468
469 int nodemap_idx_nodemap_activate(bool value)
470 {
471         return nodemap_idx_global_add_update(value, NM_UPDATE);
472 }
473
474 /**
475  * Process a key/rec pair and modify the new configuration.
476  *
477  * \param       config          configuration to update with this key/rec data
478  * \param       key             key of the record that was loaded
479  * \param       rec             record that was loaded
480  * \param       recent_nodemap  last referenced nodemap
481  * \retval      type of record processed, see enum #nodemap_idx_type
482  * \retval      -ENOENT         range or map loaded before nodemap record
483  * \retval      -EINVAL         duplicate nodemap cluster records found with
484  *                              different IDs, or nodemap has invalid name
485  * \retval      -ENOMEM
486  */
487 static int nodemap_process_keyrec(struct nodemap_config *config,
488                                   const struct nodemap_key *key,
489                                   const union nodemap_rec *rec,
490                                   struct lu_nodemap **recent_nodemap)
491 {
492         struct lu_nodemap       *nodemap = NULL;
493         enum nodemap_idx_type    type;
494         enum nodemap_id_type     id_type;
495         u8                       flags;
496         u32                      nodemap_id;
497         lnet_nid_t               nid[2];
498         u32                      map[2];
499         int                      rc;
500
501         CLASSERT(sizeof(union nodemap_rec) == 32);
502
503         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
504         type = nm_idx_get_type(nodemap_id);
505         nodemap_id = nm_idx_set_type(nodemap_id, 0);
506
507         CDEBUG(D_INFO, "found config entry, nm_id %d type %d\n",
508                nodemap_id, type);
509
510         /* find the correct nodemap in the load list */
511         if (type == NODEMAP_RANGE_IDX || type == NODEMAP_UIDMAP_IDX ||
512             type == NODEMAP_GIDMAP_IDX) {
513                 struct lu_nodemap *tmp = NULL;
514
515                 nodemap = *recent_nodemap;
516
517                 if (nodemap == NULL)
518                         GOTO(out, rc = -ENOENT);
519
520                 if (nodemap->nm_id != nodemap_id) {
521                         list_for_each_entry(tmp, &nodemap->nm_list, nm_list)
522                                 if (tmp->nm_id == nodemap_id) {
523                                         nodemap = tmp;
524                                         break;
525                                 }
526
527                         if (nodemap->nm_id != nodemap_id)
528                                 GOTO(out, rc = -ENOENT);
529                 }
530
531                 /* update most recently used nodemap if necessay */
532                 if (nodemap != *recent_nodemap)
533                         *recent_nodemap = nodemap;
534         }
535
536         switch (type) {
537         case NODEMAP_EMPTY_IDX:
538                 if (nodemap_id != 0)
539                         CWARN("Found nodemap config record without type field, "
540                               " nodemap_id=%d. nodemap config file corrupt?\n",
541                               nodemap_id);
542                 break;
543         case NODEMAP_CLUSTER_IDX:
544                 nodemap = cfs_hash_lookup(config->nmc_nodemap_hash,
545                                           rec->ncr.ncr_name);
546                 if (nodemap == NULL) {
547                         if (nodemap_id == LUSTRE_NODEMAP_DEFAULT_ID) {
548                                 nodemap = nodemap_create(rec->ncr.ncr_name,
549                                                          config, 1);
550                                 config->nmc_default_nodemap = nodemap;
551                         } else {
552                                 nodemap = nodemap_create(rec->ncr.ncr_name,
553                                                          config, 0);
554                         }
555                         if (IS_ERR(nodemap))
556                                 GOTO(out, rc = PTR_ERR(nodemap));
557
558                         /* we need to override the local ID with the saved ID */
559                         nodemap->nm_id = nodemap_id;
560                         if (nodemap_id > config->nmc_nodemap_highest_id)
561                                 config->nmc_nodemap_highest_id = nodemap_id;
562
563                 } else if (nodemap->nm_id != nodemap_id) {
564                         nodemap_putref(nodemap);
565                         GOTO(out, rc = -EINVAL);
566                 }
567
568                 nodemap->nm_squash_uid =
569                                 le32_to_cpu(rec->ncr.ncr_squash_uid);
570                 nodemap->nm_squash_gid =
571                                 le32_to_cpu(rec->ncr.ncr_squash_gid);
572
573                 flags = le32_to_cpu(rec->ncr.ncr_flags);
574                 nodemap->nmf_allow_root_access =
575                                         flags & NM_FL_ALLOW_ROOT_ACCESS;
576                 nodemap->nmf_trust_client_ids =
577                                         flags & NM_FL_TRUST_CLIENT_IDS;
578
579                 if (*recent_nodemap == NULL) {
580                         *recent_nodemap = nodemap;
581                         INIT_LIST_HEAD(&nodemap->nm_list);
582                 } else {
583                         list_add(&nodemap->nm_list,
584                                  &(*recent_nodemap)->nm_list);
585                 }
586                 nodemap_putref(nodemap);
587                 break;
588         case NODEMAP_RANGE_IDX:
589                 nid[0] = le64_to_cpu(rec->nrr.nrr_start_nid);
590                 nid[1] = le64_to_cpu(rec->nrr.nrr_end_nid);
591
592                 rc = nodemap_add_range_helper(config, nodemap, nid,
593                                         le32_to_cpu(key->nk_range_id));
594                 if (rc != 0)
595                         GOTO(out, rc);
596                 break;
597         case NODEMAP_UIDMAP_IDX:
598         case NODEMAP_GIDMAP_IDX:
599                 map[0] = le32_to_cpu(key->nk_id_client);
600                 map[1] = le32_to_cpu(rec->nir.nir_id_fs);
601
602                 if (type == NODEMAP_UIDMAP_IDX)
603                         id_type = NODEMAP_UID;
604                 else
605                         id_type = NODEMAP_GID;
606
607                 rc = nodemap_add_idmap_helper(nodemap, id_type, map);
608                 if (rc != 0)
609                         GOTO(out, rc);
610                 break;
611         case NODEMAP_GLOBAL_IDX:
612                 config->nmc_nodemap_is_active = rec->ngr.ngr_is_active;
613                 break;
614         default:
615                 CERROR("got keyrec pair for unknown type %d\n", type);
616                 break;
617         }
618         rc = type;
619
620 out:
621         return rc;
622 }
623
624 static int nodemap_load_entries(const struct lu_env *env,
625                                 struct dt_object *nodemap_idx)
626 {
627         const struct dt_it_ops  *iops;
628         struct dt_it            *it;
629         struct lu_nodemap       *recent_nodemap = NULL;
630         struct nodemap_config   *new_config = NULL;
631         u64                      hash = 0;
632         bool                     activate_nodemap = false;
633         bool                     loaded_global_idx = false;
634         int                      rc = 0;
635
636         ENTRY;
637
638         iops = &nodemap_idx->do_index_ops->dio_it;
639
640         dt_read_lock(env, nodemap_idx, 0);
641         it = iops->init(env, nodemap_idx, 0);
642         if (IS_ERR(it))
643                 GOTO(out, rc = PTR_ERR(it));
644
645         rc = iops->load(env, it, hash);
646         if (rc == 0) {
647                 rc = iops->next(env, it);
648                 if (rc != 0)
649                         GOTO(out_iops, rc = 0);
650         }
651
652         /* acquires active config lock */
653         new_config = nodemap_config_alloc();
654         if (IS_ERR(new_config)) {
655                 rc = PTR_ERR(new_config);
656                 new_config = NULL;
657                 GOTO(out_lock, rc);
658         }
659
660         do {
661                 struct nodemap_key *key;
662                 union nodemap_rec rec;
663
664                 key = (struct nodemap_key *)iops->key(env, it);
665                 rc = iops->rec(env, it, (struct dt_rec *)&rec, 0);
666                 if (rc != -ESTALE) {
667                         if (rc != 0)
668                                 GOTO(out_lock, rc);
669                         rc = nodemap_process_keyrec(new_config, key, &rec,
670                                                     &recent_nodemap);
671                         if (rc < 0)
672                                 GOTO(out_lock, rc);
673                         if (rc == NODEMAP_GLOBAL_IDX)
674                                 loaded_global_idx = true;
675                 }
676
677                 do
678                         rc = iops->next(env, it);
679                 while (rc == -ESTALE);
680         } while (rc == 0);
681
682         if (rc > 0)
683                 rc = 0;
684
685 out_lock:
686         if (rc != 0)
687                 nodemap_config_dealloc(new_config);
688         else
689                 /* creating new default needs to be done outside dt read lock */
690                 activate_nodemap = true;
691 out_iops:
692         iops->put(env, it);
693         iops->fini(env, it);
694 out:
695         dt_read_unlock(env, nodemap_idx);
696
697         if (rc != 0)
698                 CWARN("%s: failed to load nodemap configuration: rc = %d\n",
699                       nodemap_idx->do_lu.lo_dev->ld_obd->obd_name, rc);
700
701         if (!activate_nodemap)
702                 RETURN(rc);
703
704         if (new_config->nmc_default_nodemap == NULL) {
705                 /* new MGS won't have a default nm on disk, so create it here */
706                 new_config->nmc_default_nodemap =
707                         nodemap_create(DEFAULT_NODEMAP, new_config, 1);
708                 if (IS_ERR(new_config->nmc_default_nodemap)) {
709                         rc = PTR_ERR(new_config->nmc_default_nodemap);
710                 } else {
711                         rc = nodemap_idx_nodemap_add_update(
712                                         new_config->nmc_default_nodemap,
713                                         NM_ADD);
714                         nodemap_putref(new_config->nmc_default_nodemap);
715                 }
716         }
717
718         /* new nodemap config won't have an active/inactive record */
719         if (rc == 0 && loaded_global_idx == false)
720                 rc = nodemap_idx_global_add_update(false, NM_ADD);
721
722         if (rc == 0)
723                 nodemap_config_set_active(new_config);
724         else
725                 nodemap_config_dealloc(new_config);
726
727         RETURN(rc);
728 }
729
730 /**
731  * Register a dt_object representing the config index file. This should be
732  * called by targets in order to load the nodemap configuration from disk. The
733  * dt_object should be created with local_index_find_or_create and the index
734  * features should be enabled with do_index_try.
735  *
736  * \param obj   dt_object returned by local_index_find_or_create
737  *
738  * \retval      on success: nm_config_file handle for later deregistration
739  * \retval      -ENOMEM         memory allocation failure
740  * \retval      -ENOENT         error loading nodemap config
741  * \retval      -EINVAL         error loading nodemap config
742  */
743 struct nm_config_file *nm_config_file_register(const struct lu_env *env,
744                                                struct dt_object *obj)
745 {
746         struct nm_config_file *ncf;
747         bool load_entries = false;
748         int rc;
749         ENTRY;
750
751         OBD_ALLOC_PTR(ncf);
752         if (ncf == NULL)
753                 RETURN(ERR_PTR(-ENOMEM));
754
755         ncf->ncf_obj = obj;
756         mutex_lock(&ncf_list_lock);
757
758         /* if this is first config file, we load it from disk */
759         if (list_empty(&ncf_list_head))
760                 load_entries = true;
761
762         list_add(&ncf->ncf_list, &ncf_list_head);
763         mutex_unlock(&ncf_list_lock);
764
765         if (load_entries) {
766                 rc = nodemap_load_entries(env, obj);
767                 if (rc < 0) {
768                         mutex_lock(&ncf_list_lock);
769                         list_del(&ncf->ncf_list);
770                         mutex_unlock(&ncf_list_lock);
771                         OBD_FREE_PTR(ncf);
772                         RETURN(ERR_PTR(rc));
773                 }
774         }
775
776         RETURN(ncf);
777 }
778 EXPORT_SYMBOL(nm_config_file_register);
779
780 /**
781  * Deregister a nm_config_file. Should be called by targets during cleanup.
782  *
783  * \param ncf   config file to deregister
784  */
785 void nm_config_file_deregister(const struct lu_env *env,
786                                struct nm_config_file *ncf)
787 {
788         ENTRY;
789
790         lu_object_put(env, &ncf->ncf_obj->do_lu);
791
792         mutex_lock(&ncf_list_lock);
793         list_del(&ncf->ncf_list);
794         mutex_unlock(&ncf_list_lock);
795         OBD_FREE_PTR(ncf);
796
797         EXIT;
798 }
799 EXPORT_SYMBOL(nm_config_file_deregister);
800
801 int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
802                               struct lu_nodemap **recent_nodemap)
803 {
804         struct nodemap_key *key;
805         union nodemap_rec *rec;
806         char *entry;
807         int j;
808         int k;
809         int rc = 0;
810         int size = dt_nodemap_features.dif_keysize_max +
811                    dt_nodemap_features.dif_recsize_max;
812
813         for (j = 0; j < LU_PAGE_COUNT; j++) {
814                 if (lip->lp_idx.lip_magic != LIP_MAGIC)
815                         return -EINVAL;
816
817                 /* get and process keys and records from page */
818                 for (k = 0; k < lip->lp_idx.lip_nr; k++) {
819                         entry = lip->lp_idx.lip_entries + k * size;
820                         key = (struct nodemap_key *)entry;
821
822                         entry += dt_nodemap_features.dif_keysize_max;
823                         rec = (union nodemap_rec *)entry;
824
825                         rc = nodemap_process_keyrec(config, key, rec,
826                                                     recent_nodemap);
827                         if (rc < 0)
828                                 return rc;
829                 }
830                 lip++;
831         }
832         return 0;
833 }
834 EXPORT_SYMBOL(nodemap_process_idx_pages);
835
836 int nodemap_index_read(struct lu_env *env,
837                        struct nm_config_file *ncf,
838                        struct idx_info *ii,
839                        const struct lu_rdpg *rdpg)
840 {
841         struct dt_object        *nodemap_idx = ncf->ncf_obj;
842         __u64                    version;
843         int                      rc = 0;
844
845         ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
846         ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
847
848         dt_read_lock(env, nodemap_idx, 0);
849         version = dt_version_get(env, nodemap_idx);
850         if (rdpg->rp_hash != 0 && ii->ii_version != version) {
851                 CDEBUG(D_INFO, "nodemap config changed while sending, "
852                                "old "LPU64", new "LPU64"\n",
853                        ii->ii_version,
854                        version);
855                 ii->ii_hash_end = 0;
856         } else {
857                 rc = dt_index_walk(env, nodemap_idx, rdpg, NULL, ii);
858                 CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
859         }
860
861         if (rc >= 0)
862                 ii->ii_version = version;
863
864         dt_read_unlock(env, nodemap_idx);
865         return rc;
866 }
867 EXPORT_SYMBOL(nodemap_index_read);
868
869 /**
870  * Returns the current nodemap configuration to MGC by walking the nodemap
871  * config index and storing it in the response buffer.
872  *
873  * \param       req             incoming MGS_CONFIG_READ request
874  * \retval      0               success
875  * \retval      -EINVAL         malformed request
876  * \retval      -ENOTCONN       client evicted/reconnected already
877  * \retval      -ETIMEDOUT      client timeout or network error
878  * \retval      -ENOMEM
879  */
880 int nodemap_get_config_req(struct obd_device *mgs_obd,
881                            struct ptlrpc_request *req)
882 {
883         struct mgs_config_body *body;
884         struct mgs_config_res *res;
885         struct lu_rdpg rdpg;
886         struct idx_info nodemap_ii;
887         struct ptlrpc_bulk_desc *desc;
888         struct l_wait_info lwi;
889         struct tg_export_data *rqexp_ted = &req->rq_export->exp_target_data;
890         int i;
891         int page_count;
892         int bytes = 0;
893         int rc = 0;
894
895         body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
896         if (!body)
897                 RETURN(-EINVAL);
898
899         if (body->mcb_type != CONFIG_T_NODEMAP)
900                 RETURN(-EINVAL);
901
902         rdpg.rp_count = (body->mcb_units << body->mcb_bits);
903         rdpg.rp_npages = (rdpg.rp_count + PAGE_CACHE_SIZE - 1) >>
904                 PAGE_CACHE_SHIFT;
905         if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
906                 RETURN(-EINVAL);
907
908         CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
909                body->mcb_name, rdpg.rp_count);
910
911         /* allocate pages to store the containers */
912         OBD_ALLOC(rdpg.rp_pages, sizeof(*rdpg.rp_pages) * rdpg.rp_npages);
913         if (rdpg.rp_pages == NULL)
914                 RETURN(-ENOMEM);
915         for (i = 0; i < rdpg.rp_npages; i++) {
916                 rdpg.rp_pages[i] = alloc_page(GFP_IOFS);
917                 if (rdpg.rp_pages[i] == NULL)
918                         GOTO(out, rc = -ENOMEM);
919         }
920
921         rdpg.rp_hash = body->mcb_offset;
922         nodemap_ii.ii_magic = IDX_INFO_MAGIC;
923         nodemap_ii.ii_flags = II_FL_NOHASH;
924         nodemap_ii.ii_version = rqexp_ted->ted_nodemap_version;
925
926         bytes = nodemap_index_read(req->rq_svc_thread->t_env,
927                                    mgs_obd->u.obt.obt_nodemap_config_file,
928                                    &nodemap_ii, &rdpg);
929         if (bytes < 0)
930                 GOTO(out, rc = bytes);
931
932         rqexp_ted->ted_nodemap_version = nodemap_ii.ii_version;
933
934         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
935         if (res == NULL)
936                 GOTO(out, rc = -EINVAL);
937         res->mcr_offset = nodemap_ii.ii_hash_end;
938         res->mcr_size = bytes;
939
940         page_count = (bytes + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
941         LASSERT(page_count <= rdpg.rp_count);
942         desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
943                                     PTLRPC_BULK_PUT_SOURCE |
944                                         PTLRPC_BULK_BUF_KIOV,
945                                     MGS_BULK_PORTAL,
946                                     &ptlrpc_bulk_kiov_pin_ops);
947         if (desc == NULL)
948                 GOTO(out, rc = -ENOMEM);
949
950         for (i = 0; i < page_count && bytes > 0; i++) {
951                 ptlrpc_prep_bulk_page_pin(desc, rdpg.rp_pages[i], 0,
952                                           min_t(int, bytes, PAGE_CACHE_SIZE));
953                 bytes -= PAGE_CACHE_SIZE;
954         }
955
956         rc = target_bulk_io(req->rq_export, desc, &lwi);
957         ptlrpc_free_bulk(desc);
958
959 out:
960         if (rdpg.rp_pages != NULL) {
961                 for (i = 0; i < rdpg.rp_npages; i++)
962                         if (rdpg.rp_pages[i] != NULL)
963                                 __free_page(rdpg.rp_pages[i]);
964                 OBD_FREE(rdpg.rp_pages,
965                          rdpg.rp_npages * sizeof(rdpg.rp_pages[0]));
966         }
967         return rc;
968 }
969 EXPORT_SYMBOL(nodemap_get_config_req);