Whamcloud - gitweb
LU-10855 ptlrpc: remove obsolete OBD RPC opcodes
[fs/lustre-release.git] / lustre / ptlrpc / nodemap_storage.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2015, Trustees of Indiana University
24  *
25  * Copyright (c) 2017, Intel Corporation.
26  *
27  * Author: Joshua Walgenbach <jjw@iu.edu>
28  * Author: Kit Westneat <cwestnea@iu.edu>
29  *
30  * Implements the storage functionality for the nodemap configuration. Functions
31  * in this file prepare, store, and load nodemap configuration data. Targets
32  * using nodemap services should register a configuration file object. Nodemap
33  * configuration changes that need to persist should call the appropriate
34  * storage function for the data being modified.
35  *
36  * There are several index types as defined in enum nodemap_idx_type:
37  *      NODEMAP_CLUSTER_IDX     stores the data found on the lu_nodemap struct,
38  *                              like root squash and config flags, as well as
39  *                              the name.
40  *      NODEMAP_RANGE_IDX       stores NID range information for a nodemap
41  *      NODEMAP_UIDMAP_IDX      stores a fs/client UID mapping pair
42  *      NODEMAP_GIDMAP_IDX      stores a fs/client GID mapping pair
43  *      NODEMAP_GLOBAL_IDX      stores whether or not nodemaps are active
44  */
45
46 #include <libcfs/libcfs.h>
47 #include <linux/err.h>
48 #include <linux/kernel.h>
49 #include <linux/list.h>
50 #include <linux/mutex.h>
51 #include <linux/string.h>
52 #include <linux/types.h>
53 #include <uapi/linux/lnet/lnet-types.h>
54 #include <uapi/linux/lustre/lustre_idl.h>
55 #include <dt_object.h>
56 #include <lu_object.h>
57 #include <lustre_net.h>
58 #include <lustre_nodemap.h>
59 #include <obd_class.h>
60 #include <obd_support.h>
61 #include "nodemap_internal.h"
62
63 /* list of registered nodemap index files, except MGS */
64 static LIST_HEAD(ncf_list_head);
65 static DEFINE_MUTEX(ncf_list_lock);
66
67 /* MGS index is different than others, others are listeners to MGS idx */
68 static struct nm_config_file *nodemap_mgs_ncf;
69
70 /* lu_nodemap flags */
71 enum nm_flag_shifts {
72         NM_FL_ALLOW_ROOT_ACCESS = 0x1,
73         NM_FL_TRUST_CLIENT_IDS = 0x2,
74         NM_FL_DENY_UNKNOWN = 0x4,
75         NM_FL_MAP_UID_ONLY = 0x8,
76         NM_FL_MAP_GID_ONLY = 0x10,
77         NM_FL_ENABLE_AUDIT = 0x20,
78 };
79
80 static void nodemap_cluster_key_init(struct nodemap_key *nk, unsigned int nm_id)
81 {
82         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
83                                                         NODEMAP_CLUSTER_IDX));
84         nk->nk_unused = 0;
85 }
86
87 static void nodemap_cluster_rec_init(union nodemap_rec *nr,
88                                      const struct lu_nodemap *nodemap)
89 {
90         CLASSERT(sizeof(nr->ncr.ncr_name) == sizeof(nodemap->nm_name));
91
92         strncpy(nr->ncr.ncr_name, nodemap->nm_name, sizeof(nodemap->nm_name));
93         nr->ncr.ncr_squash_uid = cpu_to_le32(nodemap->nm_squash_uid);
94         nr->ncr.ncr_squash_gid = cpu_to_le32(nodemap->nm_squash_gid);
95         nr->ncr.ncr_flags = cpu_to_le32(
96                 (nodemap->nmf_trust_client_ids ?
97                         NM_FL_TRUST_CLIENT_IDS : 0) |
98                 (nodemap->nmf_allow_root_access ?
99                         NM_FL_ALLOW_ROOT_ACCESS : 0) |
100                 (nodemap->nmf_deny_unknown ?
101                         NM_FL_DENY_UNKNOWN : 0) |
102                 (nodemap->nmf_map_uid_only ?
103                         NM_FL_MAP_UID_ONLY : 0) |
104                 (nodemap->nmf_map_gid_only ?
105                         NM_FL_MAP_GID_ONLY : 0) |
106                 (nodemap->nmf_enable_audit ?
107                         NM_FL_ENABLE_AUDIT : 0));
108 }
109
110 static void nodemap_idmap_key_init(struct nodemap_key *nk, unsigned int nm_id,
111                                    enum nodemap_id_type id_type,
112                                    u32 id_client)
113 {
114         enum nodemap_idx_type idx_type;
115
116         if (id_type == NODEMAP_UID)
117                 idx_type = NODEMAP_UIDMAP_IDX;
118         else
119                 idx_type = NODEMAP_GIDMAP_IDX;
120
121         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id, idx_type));
122         nk->nk_id_client = cpu_to_le32(id_client);
123 }
124
125 static void nodemap_idmap_rec_init(union nodemap_rec *nr, u32 id_fs)
126 {
127         nr->nir.nir_id_fs = cpu_to_le32(id_fs);
128 }
129
130 static void nodemap_range_key_init(struct nodemap_key *nk, unsigned int nm_id,
131                                    unsigned int rn_id)
132 {
133         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
134                                                         NODEMAP_RANGE_IDX));
135         nk->nk_range_id = cpu_to_le32(rn_id);
136 }
137
138 static void nodemap_range_rec_init(union nodemap_rec *nr,
139                                    const lnet_nid_t nid[2])
140 {
141         nr->nrr.nrr_start_nid = cpu_to_le64(nid[0]);
142         nr->nrr.nrr_end_nid = cpu_to_le64(nid[1]);
143 }
144
145 static void nodemap_global_key_init(struct nodemap_key *nk)
146 {
147         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(0, NODEMAP_GLOBAL_IDX));
148         nk->nk_unused = 0;
149 }
150
151 static void nodemap_global_rec_init(union nodemap_rec *nr, bool active)
152 {
153         nr->ngr.ngr_is_active = active;
154 }
155
156 /* should be called with dt_write lock */
157 static void nodemap_inc_version(const struct lu_env *env,
158                                 struct dt_object *nodemap_idx,
159                                 struct thandle *th)
160 {
161         u64 ver = dt_version_get(env, nodemap_idx);
162         dt_version_set(env, nodemap_idx, ver + 1, th);
163 }
164
165 enum ncfc_find_create {
166         NCFC_CREATE_NEW = 1,
167 };
168
169 static struct dt_object *nodemap_cache_find_create(const struct lu_env *env,
170                                                    struct dt_device *dev,
171                                                    struct local_oid_storage *los,
172                                                    enum ncfc_find_create create_new)
173 {
174         struct lu_fid tfid;
175         struct dt_object *root_obj;
176         struct dt_object *nm_obj;
177         int rc = 0;
178
179         rc = dt_root_get(env, dev, &tfid);
180         if (rc < 0)
181                 GOTO(out, nm_obj = ERR_PTR(rc));
182
183         root_obj = dt_locate(env, dev, &tfid);
184         if (unlikely(IS_ERR(root_obj)))
185                 GOTO(out, nm_obj = root_obj);
186
187         rc = dt_lookup_dir(env, root_obj, LUSTRE_NODEMAP_NAME, &tfid);
188         if (rc == -ENOENT) {
189                 if (dev->dd_rdonly)
190                         GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
191         } else if (rc) {
192                 GOTO(out_root, nm_obj = ERR_PTR(rc));
193         } else if (dev->dd_rdonly && create_new == NCFC_CREATE_NEW) {
194                 GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
195         }
196
197 again:
198         /* if loading index fails the first time, create new index */
199         if (create_new == NCFC_CREATE_NEW && rc != -ENOENT) {
200                 CDEBUG(D_INFO, "removing old index, creating new one\n");
201                 rc = local_object_unlink(env, dev, root_obj,
202                                          LUSTRE_NODEMAP_NAME);
203                 if (rc < 0) {
204                         /* XXX not sure the best way to get obd name. */
205                         CERROR("cannot destroy nodemap index: rc = %d\n",
206                                rc);
207                         GOTO(out_root, nm_obj = ERR_PTR(rc));
208                 }
209         }
210
211         nm_obj = local_index_find_or_create(env, los, root_obj,
212                                                 LUSTRE_NODEMAP_NAME,
213                                                 S_IFREG | S_IRUGO | S_IWUSR,
214                                                 &dt_nodemap_features);
215         if (IS_ERR(nm_obj))
216                 GOTO(out_root, nm_obj);
217
218         if (nm_obj->do_index_ops == NULL) {
219                 rc = nm_obj->do_ops->do_index_try(env, nm_obj,
220                                                       &dt_nodemap_features);
221                 /* even if loading from tgt fails, connecting to MGS will
222                  * rewrite the config
223                  */
224                 if (rc < 0) {
225                         dt_object_put(env, nm_obj);
226
227                         if (create_new == NCFC_CREATE_NEW)
228                                 GOTO(out_root, nm_obj = ERR_PTR(rc));
229
230                         CERROR("cannot load nodemap index from disk, creating "
231                                "new index: rc = %d\n", rc);
232                         create_new = NCFC_CREATE_NEW;
233                         goto again;
234                 }
235         }
236
237 out_root:
238         dt_object_put(env, root_obj);
239 out:
240         return nm_obj;
241 }
242
243 static int nodemap_idx_insert(const struct lu_env *env,
244                               struct dt_object *idx,
245                               const struct nodemap_key *nk,
246                               const union nodemap_rec *nr)
247 {
248         struct thandle *th;
249         struct dt_device *dev = lu2dt_dev(idx->do_lu.lo_dev);
250         int rc;
251
252         CLASSERT(sizeof(union nodemap_rec) == 32);
253
254         th = dt_trans_create(env, dev);
255
256         if (IS_ERR(th))
257                 GOTO(out, rc = PTR_ERR(th));
258
259         rc = dt_declare_insert(env, idx,
260                                (const struct dt_rec *)nr,
261                                (const struct dt_key *)nk, th);
262         if (rc != 0)
263                 GOTO(out, rc);
264
265         rc = dt_declare_version_set(env, idx, th);
266         if (rc != 0)
267                 GOTO(out, rc);
268
269         rc = dt_trans_start_local(env, dev, th);
270         if (rc != 0)
271                 GOTO(out, rc);
272
273         dt_write_lock(env, idx, 0);
274
275         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
276                        (const struct dt_key *)nk, th, 1);
277
278         nodemap_inc_version(env, idx, th);
279         dt_write_unlock(env, idx);
280 out:
281         dt_trans_stop(env, dev, th);
282
283         return rc;
284 }
285
286 static int nodemap_idx_update(const struct lu_env *env,
287                               struct dt_object *idx,
288                               const struct nodemap_key *nk,
289                               const union nodemap_rec *nr)
290 {
291         struct thandle          *th;
292         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
293         int                      rc = 0;
294
295         th = dt_trans_create(env, dev);
296
297         if (IS_ERR(th))
298                 GOTO(out, rc = PTR_ERR(th));
299
300         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
301         if (rc != 0)
302                 GOTO(out, rc);
303
304         rc = dt_declare_insert(env, idx, (const struct dt_rec *)nr,
305                                (const struct dt_key *)nk, th);
306         if (rc != 0)
307                 GOTO(out, rc);
308
309         rc = dt_declare_version_set(env, idx, th);
310         if (rc != 0)
311                 GOTO(out, rc);
312
313         rc = dt_trans_start_local(env, dev, th);
314         if (rc != 0)
315                 GOTO(out, rc);
316
317         dt_write_lock(env, idx, 0);
318
319         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
320         if (rc != 0)
321                 GOTO(out_lock, rc);
322
323         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
324                        (const struct dt_key *)nk, th, 1);
325         if (rc != 0)
326                 GOTO(out_lock, rc);
327
328         nodemap_inc_version(env, idx, th);
329 out_lock:
330         dt_write_unlock(env, idx);
331 out:
332         dt_trans_stop(env, dev, th);
333
334         return rc;
335 }
336
337 static int nodemap_idx_delete(const struct lu_env *env,
338                               struct dt_object *idx,
339                               const struct nodemap_key *nk,
340                               const union nodemap_rec *unused)
341 {
342         struct thandle          *th;
343         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
344         int                      rc = 0;
345
346         th = dt_trans_create(env, dev);
347
348         if (IS_ERR(th))
349                 GOTO(out, rc = PTR_ERR(th));
350
351         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
352         if (rc != 0)
353                 GOTO(out, rc);
354
355         rc = dt_declare_version_set(env, idx, th);
356         if (rc != 0)
357                 GOTO(out, rc);
358
359         rc = dt_trans_start_local(env, dev, th);
360         if (rc != 0)
361                 GOTO(out, rc);
362
363         dt_write_lock(env, idx, 0);
364
365         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
366
367         nodemap_inc_version(env, idx, th);
368
369         dt_write_unlock(env, idx);
370 out:
371         dt_trans_stop(env, dev, th);
372
373         return rc;
374 }
375
376 enum nm_add_update {
377         NM_ADD = 0,
378         NM_UPDATE = 1,
379 };
380
381 static int nodemap_idx_nodemap_add_update(const struct lu_nodemap *nodemap,
382                                           struct dt_object *idx,
383                                           enum nm_add_update update)
384 {
385         struct nodemap_key nk;
386         union nodemap_rec nr;
387         struct lu_env env;
388         int rc = 0;
389
390         ENTRY;
391
392         rc = lu_env_init(&env, LCT_LOCAL);
393         if (rc)
394                 RETURN(rc);
395
396         nodemap_cluster_key_init(&nk, nodemap->nm_id);
397         nodemap_cluster_rec_init(&nr, nodemap);
398
399         if (update == NM_UPDATE)
400                 rc = nodemap_idx_update(&env, idx, &nk, &nr);
401         else
402                 rc = nodemap_idx_insert(&env, idx, &nk, &nr);
403
404         lu_env_fini(&env);
405
406         RETURN(rc);
407 }
408
409 int nodemap_idx_nodemap_add(const struct lu_nodemap *nodemap)
410 {
411         if (nodemap_mgs_ncf == NULL) {
412                 CERROR("cannot add nodemap config to non-existing MGS.\n");
413                 return -EINVAL;
414         }
415
416         return nodemap_idx_nodemap_add_update(nodemap, nodemap_mgs_ncf->ncf_obj,
417                                               NM_ADD);
418 }
419
420 int nodemap_idx_nodemap_update(const struct lu_nodemap *nodemap)
421 {
422         if (nodemap_mgs_ncf == NULL) {
423                 CERROR("cannot add nodemap config to non-existing MGS.\n");
424                 return -EINVAL;
425         }
426
427         return nodemap_idx_nodemap_add_update(nodemap, nodemap_mgs_ncf->ncf_obj,
428                                               NM_UPDATE);
429 }
430
431 int nodemap_idx_nodemap_del(const struct lu_nodemap *nodemap)
432 {
433         struct rb_root           root;
434         struct lu_idmap         *idmap;
435         struct lu_idmap         *temp;
436         struct lu_nid_range     *range;
437         struct lu_nid_range     *range_temp;
438         struct nodemap_key       nk;
439         struct lu_env            env;
440         int                      rc = 0;
441         int                      rc2 = 0;
442
443         ENTRY;
444
445         if (nodemap_mgs_ncf == NULL) {
446                 CERROR("cannot add nodemap config to non-existing MGS.\n");
447                 return -EINVAL;
448         }
449
450         rc = lu_env_init(&env, LCT_LOCAL);
451         if (rc != 0)
452                 RETURN(rc);
453
454         root = nodemap->nm_fs_to_client_uidmap;
455         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
456                                                 id_fs_to_client) {
457                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
458                                        idmap->id_client);
459                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
460                                          &nk, NULL);
461                 if (rc2 < 0)
462                         rc = rc2;
463         }
464
465         root = nodemap->nm_client_to_fs_gidmap;
466         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
467                                                 id_client_to_fs) {
468                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
469                                        idmap->id_client);
470                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
471                                          &nk, NULL);
472                 if (rc2 < 0)
473                         rc = rc2;
474         }
475
476         list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
477                                  rn_list) {
478                 nodemap_range_key_init(&nk, nodemap->nm_id, range->rn_id);
479                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
480                                          &nk, NULL);
481                 if (rc2 < 0)
482                         rc = rc2;
483         }
484
485         nodemap_cluster_key_init(&nk, nodemap->nm_id);
486         rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
487         if (rc2 < 0)
488                 rc = rc2;
489
490         lu_env_fini(&env);
491
492         RETURN(rc);
493 }
494
495 int nodemap_idx_range_add(const struct lu_nid_range *range,
496                           const lnet_nid_t nid[2])
497 {
498         struct nodemap_key       nk;
499         union nodemap_rec        nr;
500         struct lu_env            env;
501         int                      rc = 0;
502         ENTRY;
503
504         if (nodemap_mgs_ncf == NULL) {
505                 CERROR("cannot add nodemap config to non-existing MGS.\n");
506                 return -EINVAL;
507         }
508
509         rc = lu_env_init(&env, LCT_LOCAL);
510         if (rc != 0)
511                 RETURN(rc);
512
513         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
514         nodemap_range_rec_init(&nr, nid);
515
516         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
517         lu_env_fini(&env);
518
519         RETURN(rc);
520 }
521
522 int nodemap_idx_range_del(const struct lu_nid_range *range)
523 {
524         struct nodemap_key       nk;
525         struct lu_env            env;
526         int                      rc = 0;
527         ENTRY;
528
529         if (nodemap_mgs_ncf == NULL) {
530                 CERROR("cannot add nodemap config to non-existing MGS.\n");
531                 return -EINVAL;
532         }
533
534         rc = lu_env_init(&env, LCT_LOCAL);
535         if (rc != 0)
536                 RETURN(rc);
537
538         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
539
540         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
541         lu_env_fini(&env);
542
543         RETURN(rc);
544 }
545
546 int nodemap_idx_idmap_add(const struct lu_nodemap *nodemap,
547                           enum nodemap_id_type id_type,
548                           const u32 map[2])
549 {
550         struct nodemap_key       nk;
551         union nodemap_rec        nr;
552         struct lu_env            env;
553         int                      rc = 0;
554         ENTRY;
555
556         if (nodemap_mgs_ncf == NULL) {
557                 CERROR("cannot add nodemap config to non-existing MGS.\n");
558                 return -EINVAL;
559         }
560
561         rc = lu_env_init(&env, LCT_LOCAL);
562         if (rc != 0)
563                 RETURN(rc);
564
565         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
566         nodemap_idmap_rec_init(&nr, map[1]);
567
568         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
569         lu_env_fini(&env);
570
571         RETURN(rc);
572 }
573
574 int nodemap_idx_idmap_del(const struct lu_nodemap *nodemap,
575                           enum nodemap_id_type id_type,
576                           const u32 map[2])
577 {
578         struct nodemap_key       nk;
579         struct lu_env            env;
580         int                      rc = 0;
581         ENTRY;
582
583         if (nodemap_mgs_ncf == NULL) {
584                 CERROR("cannot add nodemap config to non-existing MGS.\n");
585                 return -EINVAL;
586         }
587
588         rc = lu_env_init(&env, LCT_LOCAL);
589         if (rc != 0)
590                 RETURN(rc);
591
592         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
593
594         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
595         lu_env_fini(&env);
596
597         RETURN(rc);
598 }
599
600 static int nodemap_idx_global_add_update(bool value, enum nm_add_update update)
601 {
602         struct nodemap_key       nk;
603         union nodemap_rec        nr;
604         struct lu_env            env;
605         int                      rc = 0;
606         ENTRY;
607
608         if (nodemap_mgs_ncf == NULL) {
609                 CERROR("cannot add nodemap config to non-existing MGS.\n");
610                 return -EINVAL;
611         }
612
613         rc = lu_env_init(&env, LCT_LOCAL);
614         if (rc != 0)
615                 RETURN(rc);
616
617         nodemap_global_key_init(&nk);
618         nodemap_global_rec_init(&nr, value);
619
620         if (update == NM_UPDATE)
621                 rc = nodemap_idx_update(&env, nodemap_mgs_ncf->ncf_obj,
622                                         &nk, &nr);
623         else
624                 rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj,
625                                         &nk, &nr);
626
627         lu_env_fini(&env);
628
629         RETURN(rc);
630 }
631
632 int nodemap_idx_nodemap_activate(bool value)
633 {
634         return nodemap_idx_global_add_update(value, NM_UPDATE);
635 }
636
637 static enum nodemap_idx_type nodemap_get_key_type(const struct nodemap_key *key)
638 {
639         u32                      nodemap_id;
640
641         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
642         return nm_idx_get_type(nodemap_id);
643 }
644
645 /**
646  * Process a key/rec pair and modify the new configuration.
647  *
648  * \param       config          configuration to update with this key/rec data
649  * \param       key             key of the record that was loaded
650  * \param       rec             record that was loaded
651  * \param       recent_nodemap  last referenced nodemap
652  * \retval      type of record processed, see enum #nodemap_idx_type
653  * \retval      -ENOENT         range or map loaded before nodemap record
654  * \retval      -EINVAL         duplicate nodemap cluster records found with
655  *                              different IDs, or nodemap has invalid name
656  * \retval      -ENOMEM
657  */
658 static int nodemap_process_keyrec(struct nodemap_config *config,
659                                   const struct nodemap_key *key,
660                                   const union nodemap_rec *rec,
661                                   struct lu_nodemap **recent_nodemap)
662 {
663         struct lu_nodemap *nodemap = NULL;
664         enum nodemap_idx_type type;
665         enum nodemap_id_type id_type;
666         u8 flags;
667         u32 nodemap_id;
668         lnet_nid_t nid[2];
669         u32 map[2];
670         int rc;
671
672         ENTRY;
673
674         CLASSERT(sizeof(union nodemap_rec) == 32);
675
676         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
677         type = nodemap_get_key_type(key);
678         nodemap_id = nm_idx_set_type(nodemap_id, 0);
679
680         CDEBUG(D_INFO, "found config entry, nm_id %d type %d\n",
681                nodemap_id, type);
682
683         /* find the correct nodemap in the load list */
684         if (type == NODEMAP_RANGE_IDX || type == NODEMAP_UIDMAP_IDX ||
685             type == NODEMAP_GIDMAP_IDX) {
686                 struct lu_nodemap *tmp = NULL;
687
688                 nodemap = *recent_nodemap;
689
690                 if (nodemap == NULL)
691                         GOTO(out, rc = -ENOENT);
692
693                 if (nodemap->nm_id != nodemap_id) {
694                         list_for_each_entry(tmp, &nodemap->nm_list, nm_list)
695                                 if (tmp->nm_id == nodemap_id) {
696                                         nodemap = tmp;
697                                         break;
698                                 }
699
700                         if (nodemap->nm_id != nodemap_id)
701                                 GOTO(out, rc = -ENOENT);
702                 }
703
704                 /* update most recently used nodemap if necessay */
705                 if (nodemap != *recent_nodemap)
706                         *recent_nodemap = nodemap;
707         }
708
709         switch (type) {
710         case NODEMAP_EMPTY_IDX:
711                 if (nodemap_id != 0)
712                         CWARN("Found nodemap config record without type field, "
713                               " nodemap_id=%d. nodemap config file corrupt?\n",
714                               nodemap_id);
715                 break;
716         case NODEMAP_CLUSTER_IDX: {
717                 struct lu_nodemap *old_nm = NULL;
718
719                 nodemap = cfs_hash_lookup(config->nmc_nodemap_hash,
720                                           rec->ncr.ncr_name);
721                 if (nodemap == NULL) {
722                         if (nodemap_id == LUSTRE_NODEMAP_DEFAULT_ID) {
723                                 nodemap = nodemap_create(rec->ncr.ncr_name,
724                                                          config, 1);
725                                 config->nmc_default_nodemap = nodemap;
726                         } else {
727                                 nodemap = nodemap_create(rec->ncr.ncr_name,
728                                                          config, 0);
729                         }
730                         if (IS_ERR(nodemap))
731                                 GOTO(out, rc = PTR_ERR(nodemap));
732
733                         /* we need to override the local ID with the saved ID */
734                         nodemap->nm_id = nodemap_id;
735                         if (nodemap_id > config->nmc_nodemap_highest_id)
736                                 config->nmc_nodemap_highest_id = nodemap_id;
737
738                 } else if (nodemap->nm_id != nodemap_id) {
739                         nodemap_putref(nodemap);
740                         GOTO(out, rc = -EINVAL);
741                 }
742
743                 nodemap->nm_squash_uid =
744                                 le32_to_cpu(rec->ncr.ncr_squash_uid);
745                 nodemap->nm_squash_gid =
746                                 le32_to_cpu(rec->ncr.ncr_squash_gid);
747
748                 flags = le32_to_cpu(rec->ncr.ncr_flags);
749                 nodemap->nmf_allow_root_access =
750                                         flags & NM_FL_ALLOW_ROOT_ACCESS;
751                 nodemap->nmf_trust_client_ids =
752                                         flags & NM_FL_TRUST_CLIENT_IDS;
753                 nodemap->nmf_deny_unknown =
754                                         flags & NM_FL_DENY_UNKNOWN;
755                 nodemap->nmf_map_uid_only =
756                                         flags & NM_FL_MAP_UID_ONLY;
757                 nodemap->nmf_map_gid_only =
758                                         flags & NM_FL_MAP_GID_ONLY;
759                 nodemap->nmf_enable_audit =
760                                         flags & NM_FL_ENABLE_AUDIT;
761
762                 /* The fileset should be saved otherwise it will be empty
763                  * every time in case of "NODEMAP_CLUSTER_IDX". */
764                 mutex_lock(&active_config_lock);
765                 old_nm = nodemap_lookup(rec->ncr.ncr_name);
766                 if (!IS_ERR(old_nm) && old_nm->nm_fileset[0] != '\0')
767                         strlcpy(nodemap->nm_fileset, old_nm->nm_fileset,
768                                 sizeof(nodemap->nm_fileset));
769                 mutex_unlock(&active_config_lock);
770                 if (!IS_ERR(old_nm))
771                         nodemap_putref(old_nm);
772
773                 if (*recent_nodemap == NULL) {
774                         *recent_nodemap = nodemap;
775                         INIT_LIST_HEAD(&nodemap->nm_list);
776                 } else {
777                         list_add(&nodemap->nm_list,
778                                  &(*recent_nodemap)->nm_list);
779                 }
780                 nodemap_putref(nodemap);
781                 break;
782         }
783         case NODEMAP_RANGE_IDX:
784                 nid[0] = le64_to_cpu(rec->nrr.nrr_start_nid);
785                 nid[1] = le64_to_cpu(rec->nrr.nrr_end_nid);
786
787                 rc = nodemap_add_range_helper(config, nodemap, nid,
788                                         le32_to_cpu(key->nk_range_id));
789                 if (rc != 0)
790                         GOTO(out, rc);
791                 break;
792         case NODEMAP_UIDMAP_IDX:
793         case NODEMAP_GIDMAP_IDX:
794                 map[0] = le32_to_cpu(key->nk_id_client);
795                 map[1] = le32_to_cpu(rec->nir.nir_id_fs);
796
797                 if (type == NODEMAP_UIDMAP_IDX)
798                         id_type = NODEMAP_UID;
799                 else
800                         id_type = NODEMAP_GID;
801
802                 rc = nodemap_add_idmap_helper(nodemap, id_type, map);
803                 if (rc != 0)
804                         GOTO(out, rc);
805                 break;
806         case NODEMAP_GLOBAL_IDX:
807                 config->nmc_nodemap_is_active = rec->ngr.ngr_is_active;
808                 break;
809         default:
810                 CERROR("got keyrec pair for unknown type %d\n", type);
811                 break;
812         }
813
814         rc = type;
815
816         EXIT;
817
818 out:
819         return rc;
820 }
821
822 enum nm_config_passes {
823         NM_READ_CLUSTERS = 0,
824         NM_READ_ATTRIBUTES = 1,
825 };
826
827 static int nodemap_load_entries(const struct lu_env *env,
828                                 struct dt_object *nodemap_idx)
829 {
830         const struct dt_it_ops *iops;
831         struct dt_it *it;
832         struct lu_nodemap *recent_nodemap = NULL;
833         struct nodemap_config *new_config = NULL;
834         u64 hash = 0;
835         bool activate_nodemap = false;
836         bool loaded_global_idx = false;
837         enum nm_config_passes cur_pass = NM_READ_CLUSTERS;
838         int rc = 0;
839
840         ENTRY;
841
842         iops = &nodemap_idx->do_index_ops->dio_it;
843
844         dt_read_lock(env, nodemap_idx, 0);
845         it = iops->init(env, nodemap_idx, 0);
846         if (IS_ERR(it))
847                 GOTO(out, rc = PTR_ERR(it));
848
849         rc = iops->load(env, it, hash);
850         if (rc < 0)
851                 GOTO(out_iops_fini, rc);
852
853         /* rc == 0 means we need to advance to record */
854         if (rc == 0) {
855                 rc = iops->next(env, it);
856
857                 if (rc < 0)
858                         GOTO(out_iops_put, rc);
859                 /* rc > 0 is eof, will be checked in while below */
860         } else {
861                 /* rc == 1, we found initial record and can process below */
862                 rc = 0;
863         }
864
865         new_config = nodemap_config_alloc();
866         if (IS_ERR(new_config)) {
867                 rc = PTR_ERR(new_config);
868                 new_config = NULL;
869                 GOTO(out_iops_put, rc);
870         }
871
872         /* rc > 0 is eof, check initial iops->next here as well */
873         while (rc == 0) {
874                 struct nodemap_key *key;
875                 union nodemap_rec rec;
876                 enum nodemap_idx_type key_type;
877
878                 key = (struct nodemap_key *)iops->key(env, it);
879                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
880                 if ((cur_pass == NM_READ_CLUSTERS &&
881                                 key_type == NODEMAP_CLUSTER_IDX) ||
882                     (cur_pass == NM_READ_ATTRIBUTES &&
883                                 key_type != NODEMAP_CLUSTER_IDX &&
884                                 key_type != NODEMAP_EMPTY_IDX)) {
885                         rc = iops->rec(env, it, (struct dt_rec *)&rec, 0);
886                         if (rc != -ESTALE) {
887                                 if (rc != 0)
888                                         GOTO(out_nodemap_config, rc);
889                                 rc = nodemap_process_keyrec(new_config, key, &rec,
890                                                             &recent_nodemap);
891                                 if (rc < 0)
892                                         GOTO(out_nodemap_config, rc);
893                                 if (rc == NODEMAP_GLOBAL_IDX)
894                                         loaded_global_idx = true;
895                         }
896                 }
897
898                 do
899                         rc = iops->next(env, it);
900                 while (rc == -ESTALE);
901
902                 /* move to second pass */
903                 if (rc > 0 && cur_pass == NM_READ_CLUSTERS) {
904                         cur_pass = NM_READ_ATTRIBUTES;
905                         rc = iops->load(env, it, 0);
906                         if (rc == 0)
907                                 rc = iops->next(env, it);
908                         else if (rc > 0)
909                                 rc = 0;
910                         else
911                                 GOTO(out, rc);
912                 }
913         }
914
915         if (rc > 0)
916                 rc = 0;
917
918 out_nodemap_config:
919         if (rc != 0)
920                 nodemap_config_dealloc(new_config);
921         else
922                 /* creating new default needs to be done outside dt read lock */
923                 activate_nodemap = true;
924 out_iops_put:
925         iops->put(env, it);
926 out_iops_fini:
927         iops->fini(env, it);
928 out:
929         dt_read_unlock(env, nodemap_idx);
930
931         if (rc != 0)
932                 CWARN("%s: failed to load nodemap configuration: rc = %d\n",
933                       nodemap_idx->do_lu.lo_dev->ld_obd->obd_name, rc);
934
935         if (!activate_nodemap)
936                 RETURN(rc);
937
938         if (new_config->nmc_default_nodemap == NULL) {
939                 /* new MGS won't have a default nm on disk, so create it here */
940                 new_config->nmc_default_nodemap =
941                         nodemap_create(DEFAULT_NODEMAP, new_config, 1);
942                 if (IS_ERR(new_config->nmc_default_nodemap)) {
943                         rc = PTR_ERR(new_config->nmc_default_nodemap);
944                 } else {
945                         rc = nodemap_idx_nodemap_add_update(
946                                         new_config->nmc_default_nodemap,
947                                         nodemap_idx,
948                                         NM_ADD);
949                         nodemap_putref(new_config->nmc_default_nodemap);
950                 }
951         }
952
953         /* new nodemap config won't have an active/inactive record */
954         if (rc == 0 && loaded_global_idx == false) {
955                 struct nodemap_key       nk;
956                 union nodemap_rec        nr;
957
958                 nodemap_global_key_init(&nk);
959                 nodemap_global_rec_init(&nr, false);
960                 rc = nodemap_idx_insert(env, nodemap_idx, &nk, &nr);
961         }
962
963         if (rc == 0)
964                 nodemap_config_set_active(new_config);
965         else
966                 nodemap_config_dealloc(new_config);
967
968         RETURN(rc);
969 }
970
971 /**
972  * Step through active config and write to disk.
973  */
974 struct dt_object *nodemap_save_config_cache(const struct lu_env *env,
975                                             struct dt_device *dev,
976                                             struct local_oid_storage *los)
977 {
978         struct dt_object *o;
979         struct lu_nodemap *nodemap;
980         struct lu_nodemap *nm_tmp;
981         struct lu_nid_range *range;
982         struct lu_nid_range *range_temp;
983         struct lu_idmap *idmap;
984         struct lu_idmap *id_tmp;
985         struct rb_root root;
986         struct nodemap_key nk;
987         union nodemap_rec nr;
988         LIST_HEAD(nodemap_list_head);
989         int rc = 0, rc2;
990
991         ENTRY;
992
993         /* create a new index file to fill with active config */
994         o = nodemap_cache_find_create(env, dev, los, NCFC_CREATE_NEW);
995         if (IS_ERR(o))
996                 RETURN(o);
997
998         mutex_lock(&active_config_lock);
999
1000         /* convert hash to list so we don't spin */
1001         cfs_hash_for_each_safe(active_config->nmc_nodemap_hash,
1002                                nm_hash_list_cb, &nodemap_list_head);
1003
1004         list_for_each_entry_safe(nodemap, nm_tmp, &nodemap_list_head, nm_list) {
1005                 nodemap_cluster_key_init(&nk, nodemap->nm_id);
1006                 nodemap_cluster_rec_init(&nr, nodemap);
1007
1008                 rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1009                 if (rc2 < 0) {
1010                         rc = rc2;
1011                         continue;
1012                 }
1013
1014                 down_read(&active_config->nmc_range_tree_lock);
1015                 list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
1016                                          rn_list) {
1017                         lnet_nid_t nid[2] = {
1018                                 range->rn_node.in_extent.start,
1019                                 range->rn_node.in_extent.end
1020                         };
1021                         nodemap_range_key_init(&nk, nodemap->nm_id,
1022                                                range->rn_id);
1023                         nodemap_range_rec_init(&nr, nid);
1024                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1025                         if (rc2 < 0)
1026                                 rc = rc2;
1027                 }
1028                 up_read(&active_config->nmc_range_tree_lock);
1029
1030                 /* we don't need to take nm_idmap_lock because active config
1031                  * lock prevents changes from happening to nodemaps
1032                  */
1033                 root = nodemap->nm_client_to_fs_uidmap;
1034                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1035                                                         id_client_to_fs) {
1036                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
1037                                                idmap->id_client);
1038                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1039                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1040                         if (rc2 < 0)
1041                                 rc = rc2;
1042                 }
1043
1044                 root = nodemap->nm_client_to_fs_gidmap;
1045                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1046                                                         id_client_to_fs) {
1047                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
1048                                                idmap->id_client);
1049                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1050                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1051                         if (rc2 < 0)
1052                                 rc = rc2;
1053                 }
1054         }
1055         nodemap_global_key_init(&nk);
1056         nodemap_global_rec_init(&nr, active_config->nmc_nodemap_is_active);
1057         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1058         if (rc2 < 0)
1059                 rc = rc2;
1060
1061         mutex_unlock(&active_config_lock);
1062
1063         if (rc < 0) {
1064                 dt_object_put(env, o);
1065                 o = ERR_PTR(rc);
1066         }
1067
1068         RETURN(o);
1069 }
1070
1071 static void nodemap_save_all_caches(void)
1072 {
1073         struct nm_config_file   *ncf;
1074         struct lu_env            env;
1075         int                      rc = 0;
1076
1077         /* recreating nodemap cache requires fld_thread_key be in env */
1078         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD);
1079         if (rc != 0) {
1080                 CWARN("cannot init env for nodemap config: rc = %d\n", rc);
1081                 return;
1082         }
1083
1084         mutex_lock(&ncf_list_lock);
1085         list_for_each_entry(ncf, &ncf_list_head, ncf_list) {
1086                 struct dt_device *dev = lu2dt_dev(ncf->ncf_obj->do_lu.lo_dev);
1087                 struct obd_device *obd = ncf->ncf_obj->do_lu.lo_dev->ld_obd;
1088                 struct dt_object *o;
1089
1090                 /* put current config file so save conf can rewrite it */
1091                 dt_object_put_nocache(&env, ncf->ncf_obj);
1092                 ncf->ncf_obj = NULL;
1093
1094                 o = nodemap_save_config_cache(&env, dev, ncf->ncf_los);
1095                 if (IS_ERR(o))
1096                         CWARN("%s: error writing to nodemap config: rc = %d\n",
1097                               obd->obd_name, rc);
1098                 else
1099                         ncf->ncf_obj = o;
1100         }
1101         mutex_unlock(&ncf_list_lock);
1102
1103         lu_env_fini(&env);
1104 }
1105
1106 /* tracks if config still needs to be loaded, either from disk or network */
1107 static bool nodemap_config_loaded;
1108 static DEFINE_MUTEX(nodemap_config_loaded_lock);
1109
1110 /**
1111  * Ensures that configs loaded over the wire are prioritized over those loaded
1112  * from disk.
1113  *
1114  * \param config        config to set as the active config
1115  */
1116 void nodemap_config_set_active_mgc(struct nodemap_config *config)
1117 {
1118         mutex_lock(&nodemap_config_loaded_lock);
1119         nodemap_config_set_active(config);
1120         nodemap_config_loaded = true;
1121         nodemap_save_all_caches();
1122         mutex_unlock(&nodemap_config_loaded_lock);
1123 }
1124 EXPORT_SYMBOL(nodemap_config_set_active_mgc);
1125
1126 /**
1127  * Register a dt_object representing the config index file. This should be
1128  * called by targets in order to load the nodemap configuration from disk. The
1129  * dt_object should be created with local_index_find_or_create and the index
1130  * features should be enabled with do_index_try.
1131  *
1132  * \param obj   dt_object returned by local_index_find_or_create
1133  *
1134  * \retval      on success: nm_config_file handle for later deregistration
1135  * \retval      -ENOMEM         memory allocation failure
1136  * \retval      -ENOENT         error loading nodemap config
1137  * \retval      -EINVAL         error loading nodemap config
1138  * \retval      -EEXIST         nodemap config already registered for MGS
1139  */
1140 struct nm_config_file *nm_config_file_register_mgs(const struct lu_env *env,
1141                                                    struct dt_object *obj,
1142                                                    struct local_oid_storage *los)
1143 {
1144         struct nm_config_file *ncf;
1145         int rc = 0;
1146         ENTRY;
1147
1148         if (nodemap_mgs_ncf != NULL)
1149                 GOTO(out, ncf = ERR_PTR(-EEXIST));
1150
1151         OBD_ALLOC_PTR(ncf);
1152         if (ncf == NULL)
1153                 GOTO(out, ncf = ERR_PTR(-ENOMEM));
1154
1155         /* if loading from cache, prevent activation of MGS config until cache
1156          * loading is done, so disk config is overwritten by MGS config.
1157          */
1158         mutex_lock(&nodemap_config_loaded_lock);
1159         rc = nodemap_load_entries(env, obj);
1160         if (!rc)
1161                 nodemap_config_loaded = true;
1162         mutex_unlock(&nodemap_config_loaded_lock);
1163
1164         if (rc) {
1165                 OBD_FREE_PTR(ncf);
1166                 GOTO(out, ncf = ERR_PTR(rc));
1167         }
1168
1169         lu_object_get(&obj->do_lu);
1170
1171         ncf->ncf_obj = obj;
1172         ncf->ncf_los = los;
1173
1174         nodemap_mgs_ncf = ncf;
1175
1176 out:
1177         return ncf;
1178 }
1179 EXPORT_SYMBOL(nm_config_file_register_mgs);
1180
1181 struct nm_config_file *nm_config_file_register_tgt(const struct lu_env *env,
1182                                                    struct dt_device *dev,
1183                                                    struct local_oid_storage *los)
1184 {
1185         struct nm_config_file *ncf;
1186         struct dt_object *config_obj = NULL;
1187         int rc = 0;
1188
1189         OBD_ALLOC_PTR(ncf);
1190         if (ncf == NULL)
1191                 RETURN(ERR_PTR(-ENOMEM));
1192
1193         /* don't load from cache if config already loaded */
1194         mutex_lock(&nodemap_config_loaded_lock);
1195         if (!nodemap_config_loaded) {
1196                 config_obj = nodemap_cache_find_create(env, dev, los, 0);
1197                 if (IS_ERR(config_obj))
1198                         rc = PTR_ERR(config_obj);
1199                 else
1200                         rc = nodemap_load_entries(env, config_obj);
1201
1202                 if (!rc)
1203                         nodemap_config_loaded = true;
1204         }
1205         mutex_unlock(&nodemap_config_loaded_lock);
1206         if (rc)
1207                 GOTO(out_ncf, rc);
1208
1209         /* sync on disk caches w/ loaded config in memory, ncf_obj may change */
1210         if (!config_obj) {
1211                 config_obj = nodemap_save_config_cache(env, dev, los);
1212                 if (IS_ERR(config_obj))
1213                         GOTO(out_ncf, rc = PTR_ERR(config_obj));
1214         }
1215
1216         ncf->ncf_obj = config_obj;
1217         ncf->ncf_los = los;
1218
1219         mutex_lock(&ncf_list_lock);
1220         list_add(&ncf->ncf_list, &ncf_list_head);
1221         mutex_unlock(&ncf_list_lock);
1222
1223 out_ncf:
1224         if (rc) {
1225                 OBD_FREE_PTR(ncf);
1226                 RETURN(ERR_PTR(rc));
1227         }
1228
1229         RETURN(ncf);
1230 }
1231 EXPORT_SYMBOL(nm_config_file_register_tgt);
1232
1233 /**
1234  * Deregister a nm_config_file. Should be called by targets during cleanup.
1235  *
1236  * \param ncf   config file to deregister
1237  */
1238 void nm_config_file_deregister_mgs(const struct lu_env *env,
1239                                    struct nm_config_file *ncf)
1240 {
1241         ENTRY;
1242         LASSERT(nodemap_mgs_ncf == ncf);
1243
1244         nodemap_mgs_ncf = NULL;
1245         if (ncf->ncf_obj)
1246                 dt_object_put(env, ncf->ncf_obj);
1247
1248         OBD_FREE_PTR(ncf);
1249
1250         EXIT;
1251 }
1252 EXPORT_SYMBOL(nm_config_file_deregister_mgs);
1253
1254 void nm_config_file_deregister_tgt(const struct lu_env *env,
1255                                    struct nm_config_file *ncf)
1256 {
1257         ENTRY;
1258
1259         if (ncf == NULL)
1260                 return;
1261
1262         mutex_lock(&ncf_list_lock);
1263         list_del(&ncf->ncf_list);
1264         mutex_unlock(&ncf_list_lock);
1265
1266         if (ncf->ncf_obj)
1267                 dt_object_put(env, ncf->ncf_obj);
1268
1269         OBD_FREE_PTR(ncf);
1270
1271         EXIT;
1272 }
1273 EXPORT_SYMBOL(nm_config_file_deregister_tgt);
1274
1275 int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
1276                               struct lu_nodemap **recent_nodemap)
1277 {
1278         struct nodemap_key *key;
1279         union nodemap_rec *rec;
1280         char *entry;
1281         int j;
1282         int k;
1283         int rc = 0;
1284         int size = dt_nodemap_features.dif_keysize_max +
1285                    dt_nodemap_features.dif_recsize_max;
1286         ENTRY;
1287
1288         for (j = 0; j < LU_PAGE_COUNT; j++) {
1289                 if (lip->lp_idx.lip_magic != LIP_MAGIC)
1290                         return -EINVAL;
1291
1292                 /* get and process keys and records from page */
1293                 for (k = 0; k < lip->lp_idx.lip_nr; k++) {
1294                         entry = lip->lp_idx.lip_entries + k * size;
1295                         key = (struct nodemap_key *)entry;
1296
1297                         entry += dt_nodemap_features.dif_keysize_max;
1298                         rec = (union nodemap_rec *)entry;
1299
1300                         rc = nodemap_process_keyrec(config, key, rec,
1301                                                     recent_nodemap);
1302                         if (rc < 0)
1303                                 return rc;
1304                 }
1305                 lip++;
1306         }
1307
1308         EXIT;
1309         return 0;
1310 }
1311 EXPORT_SYMBOL(nodemap_process_idx_pages);
1312
1313 static int nodemap_page_build(const struct lu_env *env, union lu_page *lp,
1314                               size_t nob, const struct dt_it_ops *iops,
1315                               struct dt_it *it, __u32 attr, void *arg)
1316 {
1317         struct idx_info *ii = (struct idx_info *)arg;
1318         struct lu_idxpage *lip = &lp->lp_idx;
1319         char *entry;
1320         size_t size = ii->ii_keysize + ii->ii_recsize;
1321         int rc;
1322         ENTRY;
1323
1324         if (nob < LIP_HDR_SIZE)
1325                 return -EINVAL;
1326
1327         /* initialize the header of the new container */
1328         memset(lip, 0, LIP_HDR_SIZE);
1329         lip->lip_magic = LIP_MAGIC;
1330         nob           -= LIP_HDR_SIZE;
1331
1332         entry = lip->lip_entries;
1333         do {
1334                 char            *tmp_entry = entry;
1335                 struct dt_key   *key;
1336                 __u64           hash;
1337                 enum nodemap_idx_type key_type;
1338
1339                 /* fetch 64-bit hash value */
1340                 hash = iops->store(env, it);
1341                 ii->ii_hash_end = hash;
1342
1343                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) {
1344                         if (lip->lip_nr != 0)
1345                                 GOTO(out, rc = 0);
1346                 }
1347
1348                 if (nob < size) {
1349                         if (lip->lip_nr == 0)
1350                                 GOTO(out, rc = -EINVAL);
1351                         GOTO(out, rc = 0);
1352                 }
1353
1354                 key = iops->key(env, it);
1355                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
1356
1357                 /* on the first pass, get only the cluster types. On second
1358                  * pass, get all the rest */
1359                 if ((ii->ii_attrs == NM_READ_CLUSTERS &&
1360                                 key_type == NODEMAP_CLUSTER_IDX) ||
1361                     (ii->ii_attrs == NM_READ_ATTRIBUTES &&
1362                                 key_type != NODEMAP_CLUSTER_IDX &&
1363                                 key_type != NODEMAP_EMPTY_IDX)) {
1364                         memcpy(tmp_entry, key, ii->ii_keysize);
1365                         tmp_entry += ii->ii_keysize;
1366
1367                         /* and finally the record */
1368                         rc = iops->rec(env, it, (struct dt_rec *)tmp_entry,
1369                                        attr);
1370                         if (rc != -ESTALE) {
1371                                 if (rc != 0)
1372                                         GOTO(out, rc);
1373
1374                                 /* hash/key/record successfully copied! */
1375                                 lip->lip_nr++;
1376                                 if (unlikely(lip->lip_nr == 1 &&
1377                                     ii->ii_count == 0))
1378                                         ii->ii_hash_start = hash;
1379
1380                                 entry = tmp_entry + ii->ii_recsize;
1381                                 nob -= size;
1382                         }
1383                 }
1384
1385                 /* move on to the next record */
1386                 do {
1387                         rc = iops->next(env, it);
1388                 } while (rc == -ESTALE);
1389
1390                 /* move to second pass */
1391                 if (rc > 0 && ii->ii_attrs == NM_READ_CLUSTERS) {
1392                         ii->ii_attrs = NM_READ_ATTRIBUTES;
1393                         rc = iops->load(env, it, 0);
1394                         if (rc == 0)
1395                                 rc = iops->next(env, it);
1396                         else if (rc > 0)
1397                                 rc = 0;
1398                         else
1399                                 GOTO(out, rc);
1400                 }
1401
1402         } while (rc == 0);
1403
1404         GOTO(out, rc);
1405 out:
1406         if (rc >= 0 && lip->lip_nr > 0)
1407                 /* one more container */
1408                 ii->ii_count++;
1409         if (rc > 0)
1410                 /* no more entries */
1411                 ii->ii_hash_end = II_END_OFF;
1412         return rc;
1413 }
1414
1415
1416 int nodemap_index_read(struct lu_env *env,
1417                        struct nm_config_file *ncf,
1418                        struct idx_info *ii,
1419                        const struct lu_rdpg *rdpg)
1420 {
1421         struct dt_object        *nodemap_idx = ncf->ncf_obj;
1422         __u64                    version;
1423         int                      rc = 0;
1424
1425         ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
1426         ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
1427
1428         dt_read_lock(env, nodemap_idx, 0);
1429         version = dt_version_get(env, nodemap_idx);
1430         if (rdpg->rp_hash != 0 && ii->ii_version != version) {
1431                 CDEBUG(D_INFO, "nodemap config changed inflight, old %llu, new %llu\n",
1432                        ii->ii_version,
1433                        version);
1434                 ii->ii_hash_end = 0;
1435         } else {
1436                 rc = dt_index_walk(env, nodemap_idx, rdpg, nodemap_page_build,
1437                                    ii);
1438                 CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
1439         }
1440
1441         if (rc >= 0)
1442                 ii->ii_version = version;
1443
1444         dt_read_unlock(env, nodemap_idx);
1445         return rc;
1446 }
1447 EXPORT_SYMBOL(nodemap_index_read);
1448
1449 /**
1450  * Returns the current nodemap configuration to MGC by walking the nodemap
1451  * config index and storing it in the response buffer.
1452  *
1453  * \param       req             incoming MGS_CONFIG_READ request
1454  * \retval      0               success
1455  * \retval      -EINVAL         malformed request
1456  * \retval      -ENOTCONN       client evicted/reconnected already
1457  * \retval      -ETIMEDOUT      client timeout or network error
1458  * \retval      -ENOMEM
1459  */
1460 int nodemap_get_config_req(struct obd_device *mgs_obd,
1461                            struct ptlrpc_request *req)
1462 {
1463         struct mgs_config_body *body;
1464         struct mgs_config_res *res;
1465         struct lu_rdpg rdpg;
1466         struct idx_info nodemap_ii;
1467         struct ptlrpc_bulk_desc *desc;
1468         struct l_wait_info lwi;
1469         struct tg_export_data *rqexp_ted = &req->rq_export->exp_target_data;
1470         int i;
1471         int page_count;
1472         int bytes = 0;
1473         int rc = 0;
1474
1475         body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1476         if (!body)
1477                 RETURN(-EINVAL);
1478
1479         if (body->mcb_type != CONFIG_T_NODEMAP)
1480                 RETURN(-EINVAL);
1481
1482         rdpg.rp_count = (body->mcb_units << body->mcb_bits);
1483         rdpg.rp_npages = (rdpg.rp_count + PAGE_SIZE - 1) >>
1484                 PAGE_SHIFT;
1485         if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
1486                 RETURN(-EINVAL);
1487
1488         CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
1489                body->mcb_name, rdpg.rp_count);
1490
1491         /* allocate pages to store the containers */
1492         OBD_ALLOC(rdpg.rp_pages, sizeof(*rdpg.rp_pages) * rdpg.rp_npages);
1493         if (rdpg.rp_pages == NULL)
1494                 RETURN(-ENOMEM);
1495         for (i = 0; i < rdpg.rp_npages; i++) {
1496                 rdpg.rp_pages[i] = alloc_page(GFP_NOFS);
1497                 if (rdpg.rp_pages[i] == NULL)
1498                         GOTO(out, rc = -ENOMEM);
1499         }
1500
1501         rdpg.rp_hash = body->mcb_offset;
1502         nodemap_ii.ii_magic = IDX_INFO_MAGIC;
1503         nodemap_ii.ii_flags = II_FL_NOHASH;
1504         nodemap_ii.ii_version = rqexp_ted->ted_nodemap_version;
1505         nodemap_ii.ii_attrs = body->mcb_nm_cur_pass;
1506
1507         bytes = nodemap_index_read(req->rq_svc_thread->t_env,
1508                                    mgs_obd->u.obt.obt_nodemap_config_file,
1509                                    &nodemap_ii, &rdpg);
1510         if (bytes < 0)
1511                 GOTO(out, rc = bytes);
1512
1513         rqexp_ted->ted_nodemap_version = nodemap_ii.ii_version;
1514
1515         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
1516         if (res == NULL)
1517                 GOTO(out, rc = -EINVAL);
1518         res->mcr_offset = nodemap_ii.ii_hash_end;
1519         res->mcr_nm_cur_pass = nodemap_ii.ii_attrs;
1520
1521         page_count = (bytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
1522         LASSERT(page_count <= rdpg.rp_count);
1523         desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
1524                                     PTLRPC_BULK_PUT_SOURCE |
1525                                         PTLRPC_BULK_BUF_KIOV,
1526                                     MGS_BULK_PORTAL,
1527                                     &ptlrpc_bulk_kiov_pin_ops);
1528         if (desc == NULL)
1529                 GOTO(out, rc = -ENOMEM);
1530
1531         for (i = 0; i < page_count && bytes > 0; i++) {
1532                 ptlrpc_prep_bulk_page_pin(desc, rdpg.rp_pages[i], 0,
1533                                           min_t(int, bytes, PAGE_SIZE));
1534                 bytes -= PAGE_SIZE;
1535         }
1536
1537         rc = target_bulk_io(req->rq_export, desc, &lwi);
1538         ptlrpc_free_bulk(desc);
1539
1540 out:
1541         if (rdpg.rp_pages != NULL) {
1542                 for (i = 0; i < rdpg.rp_npages; i++)
1543                         if (rdpg.rp_pages[i] != NULL)
1544                                 __free_page(rdpg.rp_pages[i]);
1545                 OBD_FREE(rdpg.rp_pages,
1546                          rdpg.rp_npages * sizeof(rdpg.rp_pages[0]));
1547         }
1548         return rc;
1549 }
1550 EXPORT_SYMBOL(nodemap_get_config_req);