Whamcloud - gitweb
LU-5092 nodemap: remove nodemap_idx_action, only act on MGS
[fs/lustre-release.git] / lustre / ptlrpc / nodemap_storage.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2015, Trustees of Indiana University
24  *
25  * Copyright (c) 2014, Intel Corporation.
26  *
27  * Author: Joshua Walgenbach <jjw@iu.edu>
28  * Author: Kit Westneat <cwestnea@iu.edu>
29  *
30  * Implements the storage functionality for the nodemap configuration. Functions
31  * in this file prepare, store, and load nodemap configuration data. Targets
32  * using nodemap services should register a configuration file object. Nodemap
33  * configuration changes that need to persist should call the appropriate
34  * storage function for the data being modified.
35  *
36  * There are several index types as defined in enum nodemap_idx_type:
37  *      NODEMAP_CLUSTER_IDX     stores the data found on the lu_nodemap struct,
38  *                              like root squash and config flags, as well as
39  *                              the name.
40  *      NODEMAP_RANGE_IDX       stores NID range information for a nodemap
41  *      NODEMAP_UIDMAP_IDX      stores a fs/client UID mapping pair
42  *      NODEMAP_GIDMAP_IDX      stores a fs/client GID mapping pair
43  *      NODEMAP_GLOBAL_IDX      stores whether or not nodemaps are active
44  */
45
46 #include <libcfs/libcfs.h>
47 #include <linux/err.h>
48 #include <linux/kernel.h>
49 #include <linux/list.h>
50 #include <linux/mutex.h>
51 #include <linux/string.h>
52 #include <linux/types.h>
53 #include <lnet/types.h>
54 #include <lustre/lustre_idl.h>
55 #include <dt_object.h>
56 #include <lu_object.h>
57 #include <lustre_net.h>
58 #include <lustre_nodemap.h>
59 #include <obd_class.h>
60 #include <obd_support.h>
61 #include "nodemap_internal.h"
62
63 /* list of registered nodemap index files, except MGS */
64 static LIST_HEAD(ncf_list_head);
65 static DEFINE_MUTEX(ncf_list_lock);
66
67 /* MGS index is different than others, others are listeners to MGS idx */
68 static struct nm_config_file *nodemap_mgs_ncf;
69
70 /* lu_nodemap flags */
71 enum nm_flag_shifts {
72         NM_FL_ALLOW_ROOT_ACCESS = 0x1,
73         NM_FL_TRUST_CLIENT_IDS = 0x2,
74 };
75
76 static void nodemap_cluster_key_init(struct nodemap_key *nk, unsigned int nm_id)
77 {
78         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
79                                                         NODEMAP_CLUSTER_IDX));
80         nk->nk_unused = 0;
81 }
82
83 static void nodemap_cluster_rec_init(union nodemap_rec *nr,
84                                      const struct lu_nodemap *nodemap)
85 {
86         CLASSERT(sizeof(nr->ncr.ncr_name) == sizeof(nodemap->nm_name));
87
88         strncpy(nr->ncr.ncr_name, nodemap->nm_name, sizeof(nodemap->nm_name));
89         nr->ncr.ncr_squash_uid = cpu_to_le32(nodemap->nm_squash_uid);
90         nr->ncr.ncr_squash_gid = cpu_to_le32(nodemap->nm_squash_gid);
91         nr->ncr.ncr_flags = cpu_to_le32(
92                 (nodemap->nmf_trust_client_ids ? NM_FL_TRUST_CLIENT_IDS : 0) |
93                 (nodemap->nmf_allow_root_access ? NM_FL_ALLOW_ROOT_ACCESS : 0));
94 }
95
96 static void nodemap_idmap_key_init(struct nodemap_key *nk, unsigned int nm_id,
97                                    enum nodemap_id_type id_type,
98                                    u32 id_client)
99 {
100         enum nodemap_idx_type idx_type;
101
102         if (id_type == NODEMAP_UID)
103                 idx_type = NODEMAP_UIDMAP_IDX;
104         else
105                 idx_type = NODEMAP_GIDMAP_IDX;
106
107         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id, idx_type));
108         nk->nk_id_client = cpu_to_le32(id_client);
109 }
110
111 static void nodemap_idmap_rec_init(union nodemap_rec *nr, u32 id_fs)
112 {
113         nr->nir.nir_id_fs = cpu_to_le32(id_fs);
114 }
115
116 static void nodemap_range_key_init(struct nodemap_key *nk, unsigned int nm_id,
117                                    unsigned int rn_id)
118 {
119         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
120                                                         NODEMAP_RANGE_IDX));
121         nk->nk_range_id = cpu_to_le32(rn_id);
122 }
123
124 static void nodemap_range_rec_init(union nodemap_rec *nr,
125                                    const lnet_nid_t nid[2])
126 {
127         nr->nrr.nrr_start_nid = cpu_to_le64(nid[0]);
128         nr->nrr.nrr_end_nid = cpu_to_le64(nid[1]);
129 }
130
131 static void nodemap_global_key_init(struct nodemap_key *nk)
132 {
133         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(0, NODEMAP_GLOBAL_IDX));
134         nk->nk_unused = 0;
135 }
136
137 static void nodemap_global_rec_init(union nodemap_rec *nr, bool active)
138 {
139         nr->ngr.ngr_is_active = active;
140 }
141
142 /* should be called with dt_write lock */
143 static void nodemap_inc_version(const struct lu_env *env,
144                                 struct dt_object *nodemap_idx,
145                                 struct thandle *th)
146 {
147         u64 ver = dt_version_get(env, nodemap_idx);
148         dt_version_set(env, nodemap_idx, ver + 1, th);
149 }
150
151 static int nodemap_idx_insert(const struct lu_env *env,
152                               struct dt_object *idx,
153                               const struct nodemap_key *nk,
154                               const union nodemap_rec *nr)
155 {
156         struct thandle          *th;
157         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
158         int                      rc;
159
160         CLASSERT(sizeof(union nodemap_rec) == 32);
161
162         th = dt_trans_create(env, dev);
163
164         if (IS_ERR(th))
165                 GOTO(out, rc = PTR_ERR(th));
166
167         rc = dt_declare_insert(env, idx,
168                                (const struct dt_rec *)nr,
169                                (const struct dt_key *)nk, th);
170         if (rc != 0)
171                 GOTO(out, rc);
172
173         rc = dt_declare_version_set(env, idx, th);
174         if (rc != 0)
175                 GOTO(out, rc);
176
177         rc = dt_trans_start_local(env, dev, th);
178         if (rc != 0)
179                 GOTO(out, rc);
180
181         dt_write_lock(env, idx, 0);
182
183         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
184                        (const struct dt_key *)nk, th, 1);
185
186         nodemap_inc_version(env, idx, th);
187         dt_write_unlock(env, idx);
188 out:
189         dt_trans_stop(env, dev, th);
190
191         return rc;
192 }
193
194 static int nodemap_idx_update(const struct lu_env *env,
195                               struct dt_object *idx,
196                               const struct nodemap_key *nk,
197                               const union nodemap_rec *nr)
198 {
199         struct thandle          *th;
200         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
201         int                      rc = 0;
202
203         th = dt_trans_create(env, dev);
204
205         if (IS_ERR(th))
206                 GOTO(out, rc = PTR_ERR(th));
207
208         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         rc = dt_declare_insert(env, idx, (const struct dt_rec *)nr,
213                                (const struct dt_key *)nk, th);
214         if (rc != 0)
215                 GOTO(out, rc);
216
217         rc = dt_declare_version_set(env, idx, th);
218         if (rc != 0)
219                 GOTO(out, rc);
220
221         rc = dt_trans_start_local(env, dev, th);
222         if (rc != 0)
223                 GOTO(out, rc);
224
225         dt_write_lock(env, idx, 0);
226
227         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
228         if (rc != 0)
229                 GOTO(out_lock, rc);
230
231         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
232                        (const struct dt_key *)nk, th, 1);
233         if (rc != 0)
234                 GOTO(out_lock, rc);
235
236         nodemap_inc_version(env, idx, th);
237 out_lock:
238         dt_write_unlock(env, idx);
239 out:
240         dt_trans_stop(env, dev, th);
241
242         return rc;
243 }
244
245 static int nodemap_idx_delete(const struct lu_env *env,
246                               struct dt_object *idx,
247                               const struct nodemap_key *nk,
248                               const union nodemap_rec *unused)
249 {
250         struct thandle          *th;
251         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
252         int                      rc = 0;
253
254         th = dt_trans_create(env, dev);
255
256         if (IS_ERR(th))
257                 GOTO(out, rc = PTR_ERR(th));
258
259         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
260         if (rc != 0)
261                 GOTO(out, rc);
262
263         rc = dt_declare_version_set(env, idx, th);
264         if (rc != 0)
265                 GOTO(out, rc);
266
267         rc = dt_trans_start_local(env, dev, th);
268         if (rc != 0)
269                 GOTO(out, rc);
270
271         dt_write_lock(env, idx, 0);
272
273         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
274
275         nodemap_inc_version(env, idx, th);
276
277         dt_write_unlock(env, idx);
278 out:
279         dt_trans_stop(env, dev, th);
280
281         return rc;
282 }
283
284 enum nm_add_update {
285         NM_ADD = 0,
286         NM_UPDATE = 1,
287 };
288
289 static int nodemap_idx_nodemap_add_update(const struct lu_nodemap *nodemap,
290                                           enum nm_add_update update)
291 {
292         struct nodemap_key nk;
293         union nodemap_rec nr;
294         struct lu_env env;
295         int rc = 0;
296
297         ENTRY;
298
299         if (nodemap_mgs_ncf == NULL) {
300                 CERROR("cannot add nodemap config to non-existing MGS.\n");
301                 return -EINVAL;
302         }
303
304         rc = lu_env_init(&env, LCT_LOCAL);
305         if (rc)
306                 RETURN(rc);
307
308         nodemap_cluster_key_init(&nk, nodemap->nm_id);
309         nodemap_cluster_rec_init(&nr, nodemap);
310
311         if (update == NM_UPDATE)
312                 rc = nodemap_idx_update(&env, nodemap_mgs_ncf->ncf_obj,
313                                         &nk, &nr);
314         else
315                 rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj,
316                                         &nk, &nr);
317
318         lu_env_fini(&env);
319
320         RETURN(rc);
321 }
322
323 int nodemap_idx_nodemap_add(const struct lu_nodemap *nodemap)
324 {
325         return nodemap_idx_nodemap_add_update(nodemap, NM_ADD);
326 }
327
328 int nodemap_idx_nodemap_update(const struct lu_nodemap *nodemap)
329 {
330         return nodemap_idx_nodemap_add_update(nodemap, NM_UPDATE);
331 }
332
333 int nodemap_idx_nodemap_del(const struct lu_nodemap *nodemap)
334 {
335         struct rb_root           root;
336         struct lu_idmap         *idmap;
337         struct lu_idmap         *temp;
338         struct lu_nid_range     *range;
339         struct lu_nid_range     *range_temp;
340         struct nodemap_key       nk;
341         struct lu_env            env;
342         int                      rc = 0;
343         int                      rc2 = 0;
344
345         ENTRY;
346
347         if (nodemap_mgs_ncf == NULL) {
348                 CERROR("cannot add nodemap config to non-existing MGS.\n");
349                 return -EINVAL;
350         }
351
352         rc = lu_env_init(&env, LCT_LOCAL);
353         if (rc != 0)
354                 RETURN(rc);
355
356         root = nodemap->nm_fs_to_client_uidmap;
357         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
358                                                 id_fs_to_client) {
359                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
360                                        idmap->id_client);
361                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
362                                          &nk, NULL);
363                 if (rc2 < 0)
364                         rc = rc2;
365         }
366
367         root = nodemap->nm_client_to_fs_gidmap;
368         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
369                                                 id_client_to_fs) {
370                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
371                                        idmap->id_client);
372                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
373                                          &nk, NULL);
374                 if (rc2 < 0)
375                         rc = rc2;
376         }
377
378         list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
379                                  rn_list) {
380                 nodemap_range_key_init(&nk, nodemap->nm_id, range->rn_id);
381                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
382                                          &nk, NULL);
383                 if (rc2 < 0)
384                         rc = rc2;
385         }
386
387         nodemap_cluster_key_init(&nk, nodemap->nm_id);
388         rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
389         if (rc2 < 0)
390                 rc = rc2;
391
392         lu_env_fini(&env);
393
394         RETURN(rc);
395 }
396
397 int nodemap_idx_range_add(const struct lu_nid_range *range,
398                           const lnet_nid_t nid[2])
399 {
400         struct nodemap_key       nk;
401         union nodemap_rec        nr;
402         struct lu_env            env;
403         int                      rc = 0;
404         ENTRY;
405
406         if (nodemap_mgs_ncf == NULL) {
407                 CERROR("cannot add nodemap config to non-existing MGS.\n");
408                 return -EINVAL;
409         }
410
411         rc = lu_env_init(&env, LCT_LOCAL);
412         if (rc != 0)
413                 RETURN(rc);
414
415         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
416         nodemap_range_rec_init(&nr, nid);
417
418         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
419         lu_env_fini(&env);
420
421         RETURN(rc);
422 }
423
424 int nodemap_idx_range_del(const struct lu_nid_range *range)
425 {
426         struct nodemap_key       nk;
427         struct lu_env            env;
428         int                      rc = 0;
429         ENTRY;
430
431         if (nodemap_mgs_ncf == NULL) {
432                 CERROR("cannot add nodemap config to non-existing MGS.\n");
433                 return -EINVAL;
434         }
435
436         rc = lu_env_init(&env, LCT_LOCAL);
437         if (rc != 0)
438                 RETURN(rc);
439
440         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
441
442         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
443         lu_env_fini(&env);
444
445         RETURN(rc);
446 }
447
448 int nodemap_idx_idmap_add(const struct lu_nodemap *nodemap,
449                           enum nodemap_id_type id_type,
450                           const u32 map[2])
451 {
452         struct nodemap_key       nk;
453         union nodemap_rec        nr;
454         struct lu_env            env;
455         int                      rc = 0;
456         ENTRY;
457
458         if (nodemap_mgs_ncf == NULL) {
459                 CERROR("cannot add nodemap config to non-existing MGS.\n");
460                 return -EINVAL;
461         }
462
463         rc = lu_env_init(&env, LCT_LOCAL);
464         if (rc != 0)
465                 RETURN(rc);
466
467         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
468         nodemap_idmap_rec_init(&nr, map[1]);
469
470         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
471         lu_env_fini(&env);
472
473         RETURN(rc);
474 }
475
476 int nodemap_idx_idmap_del(const struct lu_nodemap *nodemap,
477                           enum nodemap_id_type id_type,
478                           const u32 map[2])
479 {
480         struct nodemap_key       nk;
481         struct lu_env            env;
482         int                      rc = 0;
483         ENTRY;
484
485         if (nodemap_mgs_ncf == NULL) {
486                 CERROR("cannot add nodemap config to non-existing MGS.\n");
487                 return -EINVAL;
488         }
489
490         rc = lu_env_init(&env, LCT_LOCAL);
491         if (rc != 0)
492                 RETURN(rc);
493
494         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
495
496         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
497         lu_env_fini(&env);
498
499         RETURN(rc);
500 }
501
502 static int nodemap_idx_global_add_update(bool value, enum nm_add_update update)
503 {
504         struct nodemap_key       nk;
505         union nodemap_rec        nr;
506         struct lu_env            env;
507         int                      rc = 0;
508         ENTRY;
509
510         if (nodemap_mgs_ncf == NULL) {
511                 CERROR("cannot add nodemap config to non-existing MGS.\n");
512                 return -EINVAL;
513         }
514
515         rc = lu_env_init(&env, LCT_LOCAL);
516         if (rc != 0)
517                 RETURN(rc);
518
519         nodemap_global_key_init(&nk);
520         nodemap_global_rec_init(&nr, value);
521
522         if (update == NM_UPDATE)
523                 rc = nodemap_idx_update(&env, nodemap_mgs_ncf->ncf_obj,
524                                         &nk, &nr);
525         else
526                 rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj,
527                                         &nk, &nr);
528
529         lu_env_fini(&env);
530
531         RETURN(rc);
532 }
533
534 int nodemap_idx_nodemap_activate(bool value)
535 {
536         return nodemap_idx_global_add_update(value, NM_UPDATE);
537 }
538
539 /**
540  * Process a key/rec pair and modify the new configuration.
541  *
542  * \param       config          configuration to update with this key/rec data
543  * \param       key             key of the record that was loaded
544  * \param       rec             record that was loaded
545  * \param       recent_nodemap  last referenced nodemap
546  * \retval      type of record processed, see enum #nodemap_idx_type
547  * \retval      -ENOENT         range or map loaded before nodemap record
548  * \retval      -EINVAL         duplicate nodemap cluster records found with
549  *                              different IDs, or nodemap has invalid name
550  * \retval      -ENOMEM
551  */
552 static int nodemap_process_keyrec(struct nodemap_config *config,
553                                   const struct nodemap_key *key,
554                                   const union nodemap_rec *rec,
555                                   struct lu_nodemap **recent_nodemap)
556 {
557         struct lu_nodemap       *nodemap = NULL;
558         enum nodemap_idx_type    type;
559         enum nodemap_id_type     id_type;
560         u8                       flags;
561         u32                      nodemap_id;
562         lnet_nid_t               nid[2];
563         u32                      map[2];
564         int                      rc;
565
566         CLASSERT(sizeof(union nodemap_rec) == 32);
567
568         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
569         type = nm_idx_get_type(nodemap_id);
570         nodemap_id = nm_idx_set_type(nodemap_id, 0);
571
572         CDEBUG(D_INFO, "found config entry, nm_id %d type %d\n",
573                nodemap_id, type);
574
575         /* find the correct nodemap in the load list */
576         if (type == NODEMAP_RANGE_IDX || type == NODEMAP_UIDMAP_IDX ||
577             type == NODEMAP_GIDMAP_IDX) {
578                 struct lu_nodemap *tmp = NULL;
579
580                 nodemap = *recent_nodemap;
581
582                 if (nodemap == NULL)
583                         GOTO(out, rc = -ENOENT);
584
585                 if (nodemap->nm_id != nodemap_id) {
586                         list_for_each_entry(tmp, &nodemap->nm_list, nm_list)
587                                 if (tmp->nm_id == nodemap_id) {
588                                         nodemap = tmp;
589                                         break;
590                                 }
591
592                         if (nodemap->nm_id != nodemap_id)
593                                 GOTO(out, rc = -ENOENT);
594                 }
595
596                 /* update most recently used nodemap if necessay */
597                 if (nodemap != *recent_nodemap)
598                         *recent_nodemap = nodemap;
599         }
600
601         switch (type) {
602         case NODEMAP_EMPTY_IDX:
603                 if (nodemap_id != 0)
604                         CWARN("Found nodemap config record without type field, "
605                               " nodemap_id=%d. nodemap config file corrupt?\n",
606                               nodemap_id);
607                 break;
608         case NODEMAP_CLUSTER_IDX:
609                 nodemap = cfs_hash_lookup(config->nmc_nodemap_hash,
610                                           rec->ncr.ncr_name);
611                 if (nodemap == NULL) {
612                         if (nodemap_id == LUSTRE_NODEMAP_DEFAULT_ID) {
613                                 nodemap = nodemap_create(rec->ncr.ncr_name,
614                                                          config, 1);
615                                 config->nmc_default_nodemap = nodemap;
616                         } else {
617                                 nodemap = nodemap_create(rec->ncr.ncr_name,
618                                                          config, 0);
619                         }
620                         if (IS_ERR(nodemap))
621                                 GOTO(out, rc = PTR_ERR(nodemap));
622
623                         /* we need to override the local ID with the saved ID */
624                         nodemap->nm_id = nodemap_id;
625                         if (nodemap_id > config->nmc_nodemap_highest_id)
626                                 config->nmc_nodemap_highest_id = nodemap_id;
627
628                 } else if (nodemap->nm_id != nodemap_id) {
629                         nodemap_putref(nodemap);
630                         GOTO(out, rc = -EINVAL);
631                 }
632
633                 nodemap->nm_squash_uid =
634                                 le32_to_cpu(rec->ncr.ncr_squash_uid);
635                 nodemap->nm_squash_gid =
636                                 le32_to_cpu(rec->ncr.ncr_squash_gid);
637
638                 flags = le32_to_cpu(rec->ncr.ncr_flags);
639                 nodemap->nmf_allow_root_access =
640                                         flags & NM_FL_ALLOW_ROOT_ACCESS;
641                 nodemap->nmf_trust_client_ids =
642                                         flags & NM_FL_TRUST_CLIENT_IDS;
643
644                 if (*recent_nodemap == NULL) {
645                         *recent_nodemap = nodemap;
646                         INIT_LIST_HEAD(&nodemap->nm_list);
647                 } else {
648                         list_add(&nodemap->nm_list,
649                                  &(*recent_nodemap)->nm_list);
650                 }
651                 nodemap_putref(nodemap);
652                 break;
653         case NODEMAP_RANGE_IDX:
654                 nid[0] = le64_to_cpu(rec->nrr.nrr_start_nid);
655                 nid[1] = le64_to_cpu(rec->nrr.nrr_end_nid);
656
657                 rc = nodemap_add_range_helper(config, nodemap, nid,
658                                         le32_to_cpu(key->nk_range_id));
659                 if (rc != 0)
660                         GOTO(out, rc);
661                 break;
662         case NODEMAP_UIDMAP_IDX:
663         case NODEMAP_GIDMAP_IDX:
664                 map[0] = le32_to_cpu(key->nk_id_client);
665                 map[1] = le32_to_cpu(rec->nir.nir_id_fs);
666
667                 if (type == NODEMAP_UIDMAP_IDX)
668                         id_type = NODEMAP_UID;
669                 else
670                         id_type = NODEMAP_GID;
671
672                 rc = nodemap_add_idmap_helper(nodemap, id_type, map);
673                 if (rc != 0)
674                         GOTO(out, rc);
675                 break;
676         case NODEMAP_GLOBAL_IDX:
677                 config->nmc_nodemap_is_active = rec->ngr.ngr_is_active;
678                 break;
679         default:
680                 CERROR("got keyrec pair for unknown type %d\n", type);
681                 break;
682         }
683
684         rc = type;
685
686 out:
687         return rc;
688 }
689
690 static int nodemap_load_entries(const struct lu_env *env,
691                                 struct dt_object *nodemap_idx)
692 {
693         const struct dt_it_ops  *iops;
694         struct dt_it            *it;
695         struct lu_nodemap       *recent_nodemap = NULL;
696         struct nodemap_config   *new_config = NULL;
697         u64                      hash = 0;
698         bool                     activate_nodemap = false;
699         bool                     loaded_global_idx = false;
700         int                      rc = 0;
701
702         ENTRY;
703
704         iops = &nodemap_idx->do_index_ops->dio_it;
705
706         dt_read_lock(env, nodemap_idx, 0);
707         it = iops->init(env, nodemap_idx, 0);
708         if (IS_ERR(it))
709                 GOTO(out, rc = PTR_ERR(it));
710
711         rc = iops->load(env, it, hash);
712         if (rc == 0) {
713                 rc = iops->next(env, it);
714                 if (rc != 0)
715                         GOTO(out_iops, rc = 0);
716         }
717
718         new_config = nodemap_config_alloc();
719         if (IS_ERR(new_config)) {
720                 rc = PTR_ERR(new_config);
721                 new_config = NULL;
722                 GOTO(out_lock, rc);
723         }
724
725         do {
726                 struct nodemap_key *key;
727                 union nodemap_rec rec;
728
729                 key = (struct nodemap_key *)iops->key(env, it);
730                 rc = iops->rec(env, it, (struct dt_rec *)&rec, 0);
731                 if (rc != -ESTALE) {
732                         if (rc != 0)
733                                 GOTO(out_lock, rc);
734                         rc = nodemap_process_keyrec(new_config, key, &rec,
735                                                     &recent_nodemap);
736                         if (rc < 0)
737                                 GOTO(out_lock, rc);
738                         if (rc == NODEMAP_GLOBAL_IDX)
739                                 loaded_global_idx = true;
740                 }
741
742                 do
743                         rc = iops->next(env, it);
744                 while (rc == -ESTALE);
745         } while (rc == 0);
746
747         if (rc > 0)
748                 rc = 0;
749
750 out_lock:
751         if (rc != 0)
752                 nodemap_config_dealloc(new_config);
753         else
754                 /* creating new default needs to be done outside dt read lock */
755                 activate_nodemap = true;
756 out_iops:
757         iops->put(env, it);
758         iops->fini(env, it);
759 out:
760         dt_read_unlock(env, nodemap_idx);
761
762         if (rc != 0)
763                 CWARN("%s: failed to load nodemap configuration: rc = %d\n",
764                       nodemap_idx->do_lu.lo_dev->ld_obd->obd_name, rc);
765
766         if (!activate_nodemap)
767                 RETURN(rc);
768
769         if (new_config->nmc_default_nodemap == NULL) {
770                 /* new MGS won't have a default nm on disk, so create it here */
771                 new_config->nmc_default_nodemap =
772                         nodemap_create(DEFAULT_NODEMAP, new_config, 1);
773                 if (IS_ERR(new_config->nmc_default_nodemap)) {
774                         rc = PTR_ERR(new_config->nmc_default_nodemap);
775                 } else {
776                         rc = nodemap_idx_nodemap_add_update(
777                                         new_config->nmc_default_nodemap,
778                                         NM_ADD);
779                         nodemap_putref(new_config->nmc_default_nodemap);
780                 }
781         }
782
783         /* new nodemap config won't have an active/inactive record */
784         if (rc == 0 && loaded_global_idx == false) {
785                 struct nodemap_key       nk;
786                 union nodemap_rec        nr;
787
788                 nodemap_global_key_init(&nk);
789                 nodemap_global_rec_init(&nr, false);
790                 rc = nodemap_idx_insert(env, nodemap_idx, &nk, &nr);
791         }
792
793         if (rc == 0)
794                 nodemap_config_set_active(new_config);
795         else
796                 nodemap_config_dealloc(new_config);
797
798         RETURN(rc);
799 }
800
801 /* tracks if config still needs to be loaded, either from disk or network */
802 static bool nodemap_config_loaded;
803 static DEFINE_MUTEX(nodemap_config_loaded_lock);
804
805 /**
806  * Ensures that configs loaded over the wire are prioritized over those loaded
807  * from disk.
808  *
809  * \param config        config to set as the active config
810  */
811 void nodemap_config_set_active_mgc(struct nodemap_config *config)
812 {
813         mutex_lock(&nodemap_config_loaded_lock);
814         nodemap_config_set_active(config);
815         nodemap_config_loaded = true;
816         mutex_unlock(&nodemap_config_loaded_lock);
817 }
818 EXPORT_SYMBOL(nodemap_config_set_active_mgc);
819
820 /**
821  * Register a dt_object representing the config index file. This should be
822  * called by targets in order to load the nodemap configuration from disk. The
823  * dt_object should be created with local_index_find_or_create and the index
824  * features should be enabled with do_index_try.
825  *
826  * \param obj   dt_object returned by local_index_find_or_create
827  *
828  * \retval      on success: nm_config_file handle for later deregistration
829  * \retval      -ENOMEM         memory allocation failure
830  * \retval      -ENOENT         error loading nodemap config
831  * \retval      -EINVAL         error loading nodemap config
832  */
833 struct nm_config_file *nm_config_file_register(const struct lu_env *env,
834                                                struct dt_object *obj,
835                                                struct local_oid_storage *los,
836                                                enum nm_config_file_type ncf_type)
837 {
838         struct nm_config_file *ncf;
839         int rc = 0;
840         ENTRY;
841
842         OBD_ALLOC_PTR(ncf);
843         if (ncf == NULL)
844                 RETURN(ERR_PTR(-ENOMEM));
845
846         ncf->ncf_obj = obj;
847         ncf->ncf_los = los;
848
849         if (ncf_type == NCFT_MGS) {
850                 nodemap_mgs_ncf = ncf;
851         } else {
852                 mutex_lock(&ncf_list_lock);
853                 list_add(&ncf->ncf_list, &ncf_list_head);
854                 mutex_unlock(&ncf_list_lock);
855         }
856
857         /* prevent activation of config loaded from MGS until disk is loaded
858          * so disk config is overwritten by MGS config.
859          */
860         mutex_lock(&nodemap_config_loaded_lock);
861         if (ncf_type == NCFT_MGS || !nodemap_config_loaded)
862                 rc = nodemap_load_entries(env, obj);
863         nodemap_config_loaded = true;
864         mutex_unlock(&nodemap_config_loaded_lock);
865
866         if (rc < 0) {
867                 if (ncf_type == NCFT_MGS) {
868                         nodemap_mgs_ncf = NULL;
869                 } else {
870                         mutex_lock(&ncf_list_lock);
871                         list_del(&ncf->ncf_list);
872                         mutex_unlock(&ncf_list_lock);
873                 }
874
875                 OBD_FREE_PTR(ncf);
876                 RETURN(ERR_PTR(rc));
877         }
878
879         RETURN(ncf);
880 }
881 EXPORT_SYMBOL(nm_config_file_register);
882
883 /**
884  * Deregister a nm_config_file. Should be called by targets during cleanup.
885  *
886  * \param ncf   config file to deregister
887  */
888 void nm_config_file_deregister(const struct lu_env *env,
889                                struct nm_config_file *ncf,
890                                enum nm_config_file_type ncf_type)
891 {
892         ENTRY;
893
894         if (ncf->ncf_obj)
895                 lu_object_put(env, &ncf->ncf_obj->do_lu);
896
897         if (ncf_type == NCFT_TGT) {
898                 mutex_lock(&ncf_list_lock);
899                 list_del(&ncf->ncf_list);
900                 mutex_unlock(&ncf_list_lock);
901         } else {
902                 nodemap_mgs_ncf = NULL;
903         }
904         OBD_FREE_PTR(ncf);
905
906         EXIT;
907 }
908 EXPORT_SYMBOL(nm_config_file_deregister);
909
910 int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
911                               struct lu_nodemap **recent_nodemap)
912 {
913         struct nodemap_key *key;
914         union nodemap_rec *rec;
915         char *entry;
916         int j;
917         int k;
918         int rc = 0;
919         int size = dt_nodemap_features.dif_keysize_max +
920                    dt_nodemap_features.dif_recsize_max;
921         ENTRY;
922
923         for (j = 0; j < LU_PAGE_COUNT; j++) {
924                 if (lip->lp_idx.lip_magic != LIP_MAGIC)
925                         return -EINVAL;
926
927                 /* get and process keys and records from page */
928                 for (k = 0; k < lip->lp_idx.lip_nr; k++) {
929                         entry = lip->lp_idx.lip_entries + k * size;
930                         key = (struct nodemap_key *)entry;
931
932                         entry += dt_nodemap_features.dif_keysize_max;
933                         rec = (union nodemap_rec *)entry;
934
935                         rc = nodemap_process_keyrec(config, key, rec,
936                                                     recent_nodemap);
937                         if (rc < 0)
938                                 return rc;
939                 }
940                 lip++;
941         }
942
943         EXIT;
944         return 0;
945 }
946 EXPORT_SYMBOL(nodemap_process_idx_pages);
947
948 int nodemap_index_read(struct lu_env *env,
949                        struct nm_config_file *ncf,
950                        struct idx_info *ii,
951                        const struct lu_rdpg *rdpg)
952 {
953         struct dt_object        *nodemap_idx = ncf->ncf_obj;
954         __u64                    version;
955         int                      rc = 0;
956
957         ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
958         ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
959
960         dt_read_lock(env, nodemap_idx, 0);
961         version = dt_version_get(env, nodemap_idx);
962         if (rdpg->rp_hash != 0 && ii->ii_version != version) {
963                 CDEBUG(D_INFO, "nodemap config changed while sending, "
964                                "old "LPU64", new "LPU64"\n",
965                        ii->ii_version,
966                        version);
967                 ii->ii_hash_end = 0;
968         } else {
969                 rc = dt_index_walk(env, nodemap_idx, rdpg, NULL, ii);
970                 CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
971         }
972
973         if (rc >= 0)
974                 ii->ii_version = version;
975
976         dt_read_unlock(env, nodemap_idx);
977         return rc;
978 }
979 EXPORT_SYMBOL(nodemap_index_read);
980
981 /**
982  * Returns the current nodemap configuration to MGC by walking the nodemap
983  * config index and storing it in the response buffer.
984  *
985  * \param       req             incoming MGS_CONFIG_READ request
986  * \retval      0               success
987  * \retval      -EINVAL         malformed request
988  * \retval      -ENOTCONN       client evicted/reconnected already
989  * \retval      -ETIMEDOUT      client timeout or network error
990  * \retval      -ENOMEM
991  */
992 int nodemap_get_config_req(struct obd_device *mgs_obd,
993                            struct ptlrpc_request *req)
994 {
995         struct mgs_config_body *body;
996         struct mgs_config_res *res;
997         struct lu_rdpg rdpg;
998         struct idx_info nodemap_ii;
999         struct ptlrpc_bulk_desc *desc;
1000         struct l_wait_info lwi;
1001         struct tg_export_data *rqexp_ted = &req->rq_export->exp_target_data;
1002         int i;
1003         int page_count;
1004         int bytes = 0;
1005         int rc = 0;
1006
1007         body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1008         if (!body)
1009                 RETURN(-EINVAL);
1010
1011         if (body->mcb_type != CONFIG_T_NODEMAP)
1012                 RETURN(-EINVAL);
1013
1014         rdpg.rp_count = (body->mcb_units << body->mcb_bits);
1015         rdpg.rp_npages = (rdpg.rp_count + PAGE_CACHE_SIZE - 1) >>
1016                 PAGE_CACHE_SHIFT;
1017         if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
1018                 RETURN(-EINVAL);
1019
1020         CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
1021                body->mcb_name, rdpg.rp_count);
1022
1023         /* allocate pages to store the containers */
1024         OBD_ALLOC(rdpg.rp_pages, sizeof(*rdpg.rp_pages) * rdpg.rp_npages);
1025         if (rdpg.rp_pages == NULL)
1026                 RETURN(-ENOMEM);
1027         for (i = 0; i < rdpg.rp_npages; i++) {
1028                 rdpg.rp_pages[i] = alloc_page(GFP_IOFS);
1029                 if (rdpg.rp_pages[i] == NULL)
1030                         GOTO(out, rc = -ENOMEM);
1031         }
1032
1033         rdpg.rp_hash = body->mcb_offset;
1034         nodemap_ii.ii_magic = IDX_INFO_MAGIC;
1035         nodemap_ii.ii_flags = II_FL_NOHASH;
1036         nodemap_ii.ii_version = rqexp_ted->ted_nodemap_version;
1037
1038         bytes = nodemap_index_read(req->rq_svc_thread->t_env,
1039                                    mgs_obd->u.obt.obt_nodemap_config_file,
1040                                    &nodemap_ii, &rdpg);
1041         if (bytes < 0)
1042                 GOTO(out, rc = bytes);
1043
1044         rqexp_ted->ted_nodemap_version = nodemap_ii.ii_version;
1045
1046         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
1047         if (res == NULL)
1048                 GOTO(out, rc = -EINVAL);
1049         res->mcr_offset = nodemap_ii.ii_hash_end;
1050         res->mcr_size = bytes;
1051
1052         page_count = (bytes + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
1053         LASSERT(page_count <= rdpg.rp_count);
1054         desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
1055                                     PTLRPC_BULK_PUT_SOURCE |
1056                                         PTLRPC_BULK_BUF_KIOV,
1057                                     MGS_BULK_PORTAL,
1058                                     &ptlrpc_bulk_kiov_pin_ops);
1059         if (desc == NULL)
1060                 GOTO(out, rc = -ENOMEM);
1061
1062         for (i = 0; i < page_count && bytes > 0; i++) {
1063                 ptlrpc_prep_bulk_page_pin(desc, rdpg.rp_pages[i], 0,
1064                                           min_t(int, bytes, PAGE_CACHE_SIZE));
1065                 bytes -= PAGE_CACHE_SIZE;
1066         }
1067
1068         rc = target_bulk_io(req->rq_export, desc, &lwi);
1069         ptlrpc_free_bulk(desc);
1070
1071 out:
1072         if (rdpg.rp_pages != NULL) {
1073                 for (i = 0; i < rdpg.rp_npages; i++)
1074                         if (rdpg.rp_pages[i] != NULL)
1075                                 __free_page(rdpg.rp_pages[i]);
1076                 OBD_FREE(rdpg.rp_pages,
1077                          rdpg.rp_npages * sizeof(rdpg.rp_pages[0]));
1078         }
1079         return rc;
1080 }
1081 EXPORT_SYMBOL(nodemap_get_config_req);