Whamcloud - gitweb
LU-9405 utils: remove device path parsing from mount.lustre
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 static DEFINE_SPINLOCK(obd_types_lock);
49 static LIST_HEAD(obd_types);
50 DEFINE_RWLOCK(obd_dev_lock);
51 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52
53 static struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 static struct kmem_cache *import_cachep;
57
58 static LIST_HEAD(obd_zombie_imports);
59 static LIST_HEAD(obd_zombie_exports);
60 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
61
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66                               const char *status, int locks, int debug_level);
67
68 static LIST_HEAD(obd_stale_exports);
69 static DEFINE_SPINLOCK(obd_stale_export_lock);
70 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
71
72 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
74
75 /*
76  * support functions: we could use inter-module communication, but this
77  * is more portable to other OS's
78  */
79 static struct obd_device *obd_device_alloc(void)
80 {
81         struct obd_device *obd;
82
83         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
84         if (obd != NULL) {
85                 obd->obd_magic = OBD_DEVICE_MAGIC;
86         }
87         return obd;
88 }
89
90 static void obd_device_free(struct obd_device *obd)
91 {
92         LASSERT(obd != NULL);
93         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
94                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
95         if (obd->obd_namespace != NULL) {
96                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
97                        obd, obd->obd_namespace, obd->obd_force);
98                 LBUG();
99         }
100         lu_ref_fini(&obd->obd_reference);
101         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
102 }
103
104 struct obd_type *class_search_type(const char *name)
105 {
106         struct list_head *tmp;
107         struct obd_type *type;
108
109         spin_lock(&obd_types_lock);
110         list_for_each(tmp, &obd_types) {
111                 type = list_entry(tmp, struct obd_type, typ_chain);
112                 if (strcmp(type->typ_name, name) == 0) {
113                         spin_unlock(&obd_types_lock);
114                         return type;
115                 }
116         }
117         spin_unlock(&obd_types_lock);
118         return NULL;
119 }
120 EXPORT_SYMBOL(class_search_type);
121
122 struct obd_type *class_get_type(const char *name)
123 {
124         struct obd_type *type = class_search_type(name);
125
126 #ifdef HAVE_MODULE_LOADING_SUPPORT
127         if (!type) {
128                 const char *modname = name;
129
130                 if (strcmp(modname, "obdfilter") == 0)
131                         modname = "ofd";
132
133                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
134                         modname = LUSTRE_OSP_NAME;
135
136                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
137                         modname = LUSTRE_MDT_NAME;
138
139                 if (!request_module("%s", modname)) {
140                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
141                         type = class_search_type(name);
142                 } else {
143                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144                                            modname);
145                 }
146         }
147 #endif
148         if (type) {
149                 spin_lock(&type->obd_type_lock);
150                 type->typ_refcnt++;
151                 try_module_get(type->typ_dt_ops->o_owner);
152                 spin_unlock(&type->obd_type_lock);
153         }
154         return type;
155 }
156
157 void class_put_type(struct obd_type *type)
158 {
159         LASSERT(type);
160         spin_lock(&type->obd_type_lock);
161         type->typ_refcnt--;
162         module_put(type->typ_dt_ops->o_owner);
163         spin_unlock(&type->obd_type_lock);
164 }
165
166 #define CLASS_MAX_NAME 1024
167
168 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
169                         bool enable_proc, struct lprocfs_vars *vars,
170                         const char *name, struct lu_device_type *ldt)
171 {
172         struct obd_type *type;
173         int rc = 0;
174         ENTRY;
175
176         /* sanity check */
177         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178
179         if (class_search_type(name)) {
180                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
181                 RETURN(-EEXIST);
182         }
183
184         rc = -ENOMEM;
185         OBD_ALLOC(type, sizeof(*type));
186         if (type == NULL)
187                 RETURN(rc);
188
189         OBD_ALLOC_PTR(type->typ_dt_ops);
190         OBD_ALLOC_PTR(type->typ_md_ops);
191         OBD_ALLOC(type->typ_name, strlen(name) + 1);
192
193         if (type->typ_dt_ops == NULL ||
194             type->typ_md_ops == NULL ||
195             type->typ_name == NULL)
196                 GOTO (failed, rc);
197
198         *(type->typ_dt_ops) = *dt_ops;
199         /* md_ops is optional */
200         if (md_ops)
201                 *(type->typ_md_ops) = *md_ops;
202         strcpy(type->typ_name, name);
203         spin_lock_init(&type->obd_type_lock);
204
205 #ifdef CONFIG_PROC_FS
206         if (enable_proc) {
207                 type->typ_procroot = lprocfs_register(type->typ_name,
208                                                       proc_lustre_root,
209                                                       vars, type);
210                 if (IS_ERR(type->typ_procroot)) {
211                         rc = PTR_ERR(type->typ_procroot);
212                         type->typ_procroot = NULL;
213                         GOTO(failed, rc);
214                 }
215         }
216 #endif
217         type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
218         if (!type->typ_kobj) {
219                 rc = -ENOMEM;
220                 GOTO(failed, rc);
221         }
222
223         if (ldt != NULL) {
224                 type->typ_lu = ldt;
225                 rc = lu_device_type_init(ldt);
226                 if (rc != 0)
227                         GOTO (failed, rc);
228         }
229
230         spin_lock(&obd_types_lock);
231         list_add(&type->typ_chain, &obd_types);
232         spin_unlock(&obd_types_lock);
233
234         RETURN (0);
235
236 failed:
237         if (type->typ_kobj)
238                 kobject_put(type->typ_kobj);
239         if (type->typ_name != NULL) {
240 #ifdef CONFIG_PROC_FS
241                 if (type->typ_procroot != NULL)
242                         remove_proc_subtree(type->typ_name, proc_lustre_root);
243 #endif
244                 OBD_FREE(type->typ_name, strlen(name) + 1);
245         }
246         if (type->typ_md_ops != NULL)
247                 OBD_FREE_PTR(type->typ_md_ops);
248         if (type->typ_dt_ops != NULL)
249                 OBD_FREE_PTR(type->typ_dt_ops);
250         OBD_FREE(type, sizeof(*type));
251         RETURN(rc);
252 }
253 EXPORT_SYMBOL(class_register_type);
254
255 int class_unregister_type(const char *name)
256 {
257         struct obd_type *type = class_search_type(name);
258         ENTRY;
259
260         if (!type) {
261                 CERROR("unknown obd type\n");
262                 RETURN(-EINVAL);
263         }
264
265         if (type->typ_refcnt) {
266                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
267                 /* This is a bad situation, let's make the best of it */
268                 /* Remove ops, but leave the name for debugging */
269                 OBD_FREE_PTR(type->typ_dt_ops);
270                 OBD_FREE_PTR(type->typ_md_ops);
271                 RETURN(-EBUSY);
272         }
273
274         if (type->typ_kobj)
275                 kobject_put(type->typ_kobj);
276
277         /* we do not use type->typ_procroot as for compatibility purposes
278          * other modules can share names (i.e. lod can use lov entry). so
279          * we can't reference pointer as it can get invalided when another
280          * module removes the entry */
281 #ifdef CONFIG_PROC_FS
282         if (type->typ_procroot != NULL)
283                 remove_proc_subtree(type->typ_name, proc_lustre_root);
284         if (type->typ_procsym != NULL)
285                 lprocfs_remove(&type->typ_procsym);
286 #endif
287         if (type->typ_lu)
288                 lu_device_type_fini(type->typ_lu);
289
290         spin_lock(&obd_types_lock);
291         list_del(&type->typ_chain);
292         spin_unlock(&obd_types_lock);
293         OBD_FREE(type->typ_name, strlen(name) + 1);
294         if (type->typ_dt_ops != NULL)
295                 OBD_FREE_PTR(type->typ_dt_ops);
296         if (type->typ_md_ops != NULL)
297                 OBD_FREE_PTR(type->typ_md_ops);
298         OBD_FREE(type, sizeof(*type));
299         RETURN(0);
300 } /* class_unregister_type */
301 EXPORT_SYMBOL(class_unregister_type);
302
303 /**
304  * Create a new obd device.
305  *
306  * Allocate the new obd_device and initialize it.
307  *
308  * \param[in] type_name obd device type string.
309  * \param[in] name      obd device name.
310  * \param[in] uuid      obd device UUID
311  *
312  * \retval newdev         pointer to created obd_device
313  * \retval ERR_PTR(errno) on error
314  */
315 struct obd_device *class_newdev(const char *type_name, const char *name,
316                                 const char *uuid)
317 {
318         struct obd_device *newdev;
319         struct obd_type *type = NULL;
320         ENTRY;
321
322         if (strlen(name) >= MAX_OBD_NAME) {
323                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
324                 RETURN(ERR_PTR(-EINVAL));
325         }
326
327         type = class_get_type(type_name);
328         if (type == NULL){
329                 CERROR("OBD: unknown type: %s\n", type_name);
330                 RETURN(ERR_PTR(-ENODEV));
331         }
332
333         newdev = obd_device_alloc();
334         if (newdev == NULL) {
335                 class_put_type(type);
336                 RETURN(ERR_PTR(-ENOMEM));
337         }
338         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
339         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
340         newdev->obd_type = type;
341         newdev->obd_minor = -1;
342
343         rwlock_init(&newdev->obd_pool_lock);
344         newdev->obd_pool_limit = 0;
345         newdev->obd_pool_slv = 0;
346
347         INIT_LIST_HEAD(&newdev->obd_exports);
348         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
349         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
350         INIT_LIST_HEAD(&newdev->obd_exports_timed);
351         INIT_LIST_HEAD(&newdev->obd_nid_stats);
352         spin_lock_init(&newdev->obd_nid_lock);
353         spin_lock_init(&newdev->obd_dev_lock);
354         mutex_init(&newdev->obd_dev_mutex);
355         spin_lock_init(&newdev->obd_osfs_lock);
356         /* newdev->obd_osfs_age must be set to a value in the distant
357          * past to guarantee a fresh statfs is fetched on mount. */
358         newdev->obd_osfs_age = cfs_time_shift_64(-1000);
359
360         /* XXX belongs in setup not attach  */
361         init_rwsem(&newdev->obd_observer_link_sem);
362         /* recovery data */
363         init_timer(&newdev->obd_recovery_timer);
364         spin_lock_init(&newdev->obd_recovery_task_lock);
365         init_waitqueue_head(&newdev->obd_next_transno_waitq);
366         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
367         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
368         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
369         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
370         INIT_LIST_HEAD(&newdev->obd_evict_list);
371         INIT_LIST_HEAD(&newdev->obd_lwp_list);
372
373         llog_group_init(&newdev->obd_olg);
374         /* Detach drops this */
375         atomic_set(&newdev->obd_refcount, 1);
376         lu_ref_init(&newdev->obd_reference);
377         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
378
379         newdev->obd_conn_inprogress = 0;
380
381         strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
382
383         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
384                newdev->obd_name, newdev);
385
386         return newdev;
387 }
388
389 /**
390  * Free obd device.
391  *
392  * \param[in] obd obd_device to be freed
393  *
394  * \retval none
395  */
396 void class_free_dev(struct obd_device *obd)
397 {
398         struct obd_type *obd_type = obd->obd_type;
399
400         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
401                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
402         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
403                  "obd %p != obd_devs[%d] %p\n",
404                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
405         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
406                  "obd_refcount should be 0, not %d\n",
407                  atomic_read(&obd->obd_refcount));
408         LASSERT(obd_type != NULL);
409
410         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
411                obd->obd_name, obd->obd_type->typ_name);
412
413         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
414                          obd->obd_name, obd->obd_uuid.uuid);
415         if (obd->obd_stopping) {
416                 int err;
417
418                 /* If we're not stopping, we were never set up */
419                 err = obd_cleanup(obd);
420                 if (err)
421                         CERROR("Cleanup %s returned %d\n",
422                                 obd->obd_name, err);
423         }
424
425         obd_device_free(obd);
426
427         class_put_type(obd_type);
428 }
429
430 /**
431  * Unregister obd device.
432  *
433  * Free slot in obd_dev[] used by \a obd.
434  *
435  * \param[in] new_obd obd_device to be unregistered
436  *
437  * \retval none
438  */
439 void class_unregister_device(struct obd_device *obd)
440 {
441         write_lock(&obd_dev_lock);
442         if (obd->obd_minor >= 0) {
443                 LASSERT(obd_devs[obd->obd_minor] == obd);
444                 obd_devs[obd->obd_minor] = NULL;
445                 obd->obd_minor = -1;
446         }
447         write_unlock(&obd_dev_lock);
448 }
449
450 /**
451  * Register obd device.
452  *
453  * Find free slot in obd_devs[], fills it with \a new_obd.
454  *
455  * \param[in] new_obd obd_device to be registered
456  *
457  * \retval 0          success
458  * \retval -EEXIST    device with this name is registered
459  * \retval -EOVERFLOW obd_devs[] is full
460  */
461 int class_register_device(struct obd_device *new_obd)
462 {
463         int ret = 0;
464         int i;
465         int new_obd_minor = 0;
466         bool minor_assign = false;
467
468         write_lock(&obd_dev_lock);
469         for (i = 0; i < class_devno_max(); i++) {
470                 struct obd_device *obd = class_num2obd(i);
471
472                 if (obd != NULL &&
473                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
474                         CERROR("%s: already exists, won't add\n",
475                                obd->obd_name);
476                         /* in case we found a free slot before duplicate */
477                         minor_assign = false;
478                         ret = -EEXIST;
479                         break;
480                 }
481                 if (!minor_assign && obd == NULL) {
482                         new_obd_minor = i;
483                         minor_assign = true;
484                 }
485         }
486
487         if (minor_assign) {
488                 new_obd->obd_minor = new_obd_minor;
489                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
490                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
491                 obd_devs[new_obd_minor] = new_obd;
492         } else {
493                 if (ret == 0) {
494                         ret = -EOVERFLOW;
495                         CERROR("%s: all %u/%u devices used, increase "
496                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
497                                i, class_devno_max(), ret);
498                 }
499         }
500         write_unlock(&obd_dev_lock);
501
502         RETURN(ret);
503 }
504
505 static int class_name2dev_nolock(const char *name)
506 {
507         int i;
508
509         if (!name)
510                 return -1;
511
512         for (i = 0; i < class_devno_max(); i++) {
513                 struct obd_device *obd = class_num2obd(i);
514
515                 if (obd && strcmp(name, obd->obd_name) == 0) {
516                         /* Make sure we finished attaching before we give
517                            out any references */
518                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
519                         if (obd->obd_attached) {
520                                 return i;
521                         }
522                         break;
523                 }
524         }
525
526         return -1;
527 }
528
529 int class_name2dev(const char *name)
530 {
531         int i;
532
533         if (!name)
534                 return -1;
535
536         read_lock(&obd_dev_lock);
537         i = class_name2dev_nolock(name);
538         read_unlock(&obd_dev_lock);
539
540         return i;
541 }
542 EXPORT_SYMBOL(class_name2dev);
543
544 struct obd_device *class_name2obd(const char *name)
545 {
546         int dev = class_name2dev(name);
547
548         if (dev < 0 || dev > class_devno_max())
549                 return NULL;
550         return class_num2obd(dev);
551 }
552 EXPORT_SYMBOL(class_name2obd);
553
554 int class_uuid2dev_nolock(struct obd_uuid *uuid)
555 {
556         int i;
557
558         for (i = 0; i < class_devno_max(); i++) {
559                 struct obd_device *obd = class_num2obd(i);
560
561                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
562                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
563                         return i;
564                 }
565         }
566
567         return -1;
568 }
569
570 int class_uuid2dev(struct obd_uuid *uuid)
571 {
572         int i;
573
574         read_lock(&obd_dev_lock);
575         i = class_uuid2dev_nolock(uuid);
576         read_unlock(&obd_dev_lock);
577
578         return i;
579 }
580 EXPORT_SYMBOL(class_uuid2dev);
581
582 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
583 {
584         int dev = class_uuid2dev(uuid);
585         if (dev < 0)
586                 return NULL;
587         return class_num2obd(dev);
588 }
589 EXPORT_SYMBOL(class_uuid2obd);
590
591 /**
592  * Get obd device from ::obd_devs[]
593  *
594  * \param num [in] array index
595  *
596  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
597  *         otherwise return the obd device there.
598  */
599 struct obd_device *class_num2obd(int num)
600 {
601         struct obd_device *obd = NULL;
602
603         if (num < class_devno_max()) {
604                 obd = obd_devs[num];
605                 if (obd == NULL)
606                         return NULL;
607
608                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
609                          "%p obd_magic %08x != %08x\n",
610                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
611                 LASSERTF(obd->obd_minor == num,
612                          "%p obd_minor %0d != %0d\n",
613                          obd, obd->obd_minor, num);
614         }
615
616         return obd;
617 }
618
619 /**
620  * Find obd in obd_dev[] by name or uuid.
621  *
622  * Increment obd's refcount if found.
623  *
624  * \param[in] str obd name or uuid
625  *
626  * \retval NULL    if not found
627  * \retval target  pointer to found obd_device
628  */
629 struct obd_device *class_dev_by_str(const char *str)
630 {
631         struct obd_device *target = NULL;
632         struct obd_uuid tgtuuid;
633         int rc;
634
635         obd_str2uuid(&tgtuuid, str);
636
637         read_lock(&obd_dev_lock);
638         rc = class_uuid2dev_nolock(&tgtuuid);
639         if (rc < 0)
640                 rc = class_name2dev_nolock(str);
641
642         if (rc >= 0)
643                 target = class_num2obd(rc);
644
645         if (target != NULL)
646                 class_incref(target, "find", current);
647         read_unlock(&obd_dev_lock);
648
649         RETURN(target);
650 }
651 EXPORT_SYMBOL(class_dev_by_str);
652
653 /**
654  * Get obd devices count. Device in any
655  *    state are counted
656  * \retval obd device count
657  */
658 int get_devices_count(void)
659 {
660         int index, max_index = class_devno_max(), dev_count = 0;
661
662         read_lock(&obd_dev_lock);
663         for (index = 0; index <= max_index; index++) {
664                 struct obd_device *obd = class_num2obd(index);
665                 if (obd != NULL)
666                         dev_count++;
667         }
668         read_unlock(&obd_dev_lock);
669
670         return dev_count;
671 }
672 EXPORT_SYMBOL(get_devices_count);
673
674 void class_obd_list(void)
675 {
676         char *status;
677         int i;
678
679         read_lock(&obd_dev_lock);
680         for (i = 0; i < class_devno_max(); i++) {
681                 struct obd_device *obd = class_num2obd(i);
682
683                 if (obd == NULL)
684                         continue;
685                 if (obd->obd_stopping)
686                         status = "ST";
687                 else if (obd->obd_set_up)
688                         status = "UP";
689                 else if (obd->obd_attached)
690                         status = "AT";
691                 else
692                         status = "--";
693                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
694                          i, status, obd->obd_type->typ_name,
695                          obd->obd_name, obd->obd_uuid.uuid,
696                          atomic_read(&obd->obd_refcount));
697         }
698         read_unlock(&obd_dev_lock);
699         return;
700 }
701
702 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
703    specified, then only the client with that uuid is returned,
704    otherwise any client connected to the tgt is returned. */
705 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
706                                           const char * typ_name,
707                                           struct obd_uuid *grp_uuid)
708 {
709         int i;
710
711         read_lock(&obd_dev_lock);
712         for (i = 0; i < class_devno_max(); i++) {
713                 struct obd_device *obd = class_num2obd(i);
714
715                 if (obd == NULL)
716                         continue;
717                 if ((strncmp(obd->obd_type->typ_name, typ_name,
718                              strlen(typ_name)) == 0)) {
719                         if (obd_uuid_equals(tgt_uuid,
720                                             &obd->u.cli.cl_target_uuid) &&
721                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
722                                                          &obd->obd_uuid) : 1)) {
723                                 read_unlock(&obd_dev_lock);
724                                 return obd;
725                         }
726                 }
727         }
728         read_unlock(&obd_dev_lock);
729
730         return NULL;
731 }
732 EXPORT_SYMBOL(class_find_client_obd);
733
734 /* Iterate the obd_device list looking devices have grp_uuid. Start
735    searching at *next, and if a device is found, the next index to look
736    at is saved in *next. If next is NULL, then the first matching device
737    will always be returned. */
738 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
739 {
740         int i;
741
742         if (next == NULL)
743                 i = 0;
744         else if (*next >= 0 && *next < class_devno_max())
745                 i = *next;
746         else
747                 return NULL;
748
749         read_lock(&obd_dev_lock);
750         for (; i < class_devno_max(); i++) {
751                 struct obd_device *obd = class_num2obd(i);
752
753                 if (obd == NULL)
754                         continue;
755                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
756                         if (next != NULL)
757                                 *next = i+1;
758                         read_unlock(&obd_dev_lock);
759                         return obd;
760                 }
761         }
762         read_unlock(&obd_dev_lock);
763
764         return NULL;
765 }
766 EXPORT_SYMBOL(class_devices_in_group);
767
768 /**
769  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
770  * adjust sptlrpc settings accordingly.
771  */
772 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
773 {
774         struct obd_device  *obd;
775         const char         *type;
776         int                 i, rc = 0, rc2;
777
778         LASSERT(namelen > 0);
779
780         read_lock(&obd_dev_lock);
781         for (i = 0; i < class_devno_max(); i++) {
782                 obd = class_num2obd(i);
783
784                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
785                         continue;
786
787                 /* only notify mdc, osc, osp, lwp, mdt, ost
788                  * because only these have a -sptlrpc llog */
789                 type = obd->obd_type->typ_name;
790                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
791                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
792                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
793                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
794                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
795                     strcmp(type, LUSTRE_OST_NAME) != 0)
796                         continue;
797
798                 if (strncmp(obd->obd_name, fsname, namelen))
799                         continue;
800
801                 class_incref(obd, __FUNCTION__, obd);
802                 read_unlock(&obd_dev_lock);
803                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
804                                          sizeof(KEY_SPTLRPC_CONF),
805                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
806                 rc = rc ? rc : rc2;
807                 class_decref(obd, __FUNCTION__, obd);
808                 read_lock(&obd_dev_lock);
809         }
810         read_unlock(&obd_dev_lock);
811         return rc;
812 }
813 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
814
815 void obd_cleanup_caches(void)
816 {
817         ENTRY;
818         if (obd_device_cachep) {
819                 kmem_cache_destroy(obd_device_cachep);
820                 obd_device_cachep = NULL;
821         }
822         if (obdo_cachep) {
823                 kmem_cache_destroy(obdo_cachep);
824                 obdo_cachep = NULL;
825         }
826         if (import_cachep) {
827                 kmem_cache_destroy(import_cachep);
828                 import_cachep = NULL;
829         }
830
831         EXIT;
832 }
833
834 int obd_init_caches(void)
835 {
836         int rc;
837         ENTRY;
838
839         LASSERT(obd_device_cachep == NULL);
840         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
841                                               sizeof(struct obd_device),
842                                               0, 0, NULL);
843         if (!obd_device_cachep)
844                 GOTO(out, rc = -ENOMEM);
845
846         LASSERT(obdo_cachep == NULL);
847         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
848                                         0, 0, NULL);
849         if (!obdo_cachep)
850                 GOTO(out, rc = -ENOMEM);
851
852         LASSERT(import_cachep == NULL);
853         import_cachep = kmem_cache_create("ll_import_cache",
854                                           sizeof(struct obd_import),
855                                           0, 0, NULL);
856         if (!import_cachep)
857                 GOTO(out, rc = -ENOMEM);
858
859         RETURN(0);
860 out:
861         obd_cleanup_caches();
862         RETURN(rc);
863 }
864
865 /* map connection to client */
866 struct obd_export *class_conn2export(struct lustre_handle *conn)
867 {
868         struct obd_export *export;
869         ENTRY;
870
871         if (!conn) {
872                 CDEBUG(D_CACHE, "looking for null handle\n");
873                 RETURN(NULL);
874         }
875
876         if (conn->cookie == -1) {  /* this means assign a new connection */
877                 CDEBUG(D_CACHE, "want a new connection\n");
878                 RETURN(NULL);
879         }
880
881         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
882         export = class_handle2object(conn->cookie, NULL);
883         RETURN(export);
884 }
885 EXPORT_SYMBOL(class_conn2export);
886
887 struct obd_device *class_exp2obd(struct obd_export *exp)
888 {
889         if (exp)
890                 return exp->exp_obd;
891         return NULL;
892 }
893 EXPORT_SYMBOL(class_exp2obd);
894
895 struct obd_device *class_conn2obd(struct lustre_handle *conn)
896 {
897         struct obd_export *export;
898         export = class_conn2export(conn);
899         if (export) {
900                 struct obd_device *obd = export->exp_obd;
901                 class_export_put(export);
902                 return obd;
903         }
904         return NULL;
905 }
906
907 struct obd_import *class_exp2cliimp(struct obd_export *exp)
908 {
909         struct obd_device *obd = exp->exp_obd;
910         if (obd == NULL)
911                 return NULL;
912         return obd->u.cli.cl_import;
913 }
914 EXPORT_SYMBOL(class_exp2cliimp);
915
916 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
917 {
918         struct obd_device *obd = class_conn2obd(conn);
919         if (obd == NULL)
920                 return NULL;
921         return obd->u.cli.cl_import;
922 }
923
924 /* Export management functions */
925 static void class_export_destroy(struct obd_export *exp)
926 {
927         struct obd_device *obd = exp->exp_obd;
928         ENTRY;
929
930         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
931         LASSERT(obd != NULL);
932
933         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
934                exp->exp_client_uuid.uuid, obd->obd_name);
935
936         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
937         if (exp->exp_connection)
938                 ptlrpc_put_connection_superhack(exp->exp_connection);
939
940         LASSERT(list_empty(&exp->exp_outstanding_replies));
941         LASSERT(list_empty(&exp->exp_uncommitted_replies));
942         LASSERT(list_empty(&exp->exp_req_replay_queue));
943         LASSERT(list_empty(&exp->exp_hp_rpcs));
944         obd_destroy_export(exp);
945         /* self export doesn't hold a reference to an obd, although it
946          * exists until freeing of the obd */
947         if (exp != obd->obd_self_export)
948                 class_decref(obd, "export", exp);
949
950         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
951         EXIT;
952 }
953
954 static void export_handle_addref(void *export)
955 {
956         class_export_get(export);
957 }
958
959 static struct portals_handle_ops export_handle_ops = {
960         .hop_addref = export_handle_addref,
961         .hop_free   = NULL,
962 };
963
964 struct obd_export *class_export_get(struct obd_export *exp)
965 {
966         atomic_inc(&exp->exp_refcount);
967         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
968                atomic_read(&exp->exp_refcount));
969         return exp;
970 }
971 EXPORT_SYMBOL(class_export_get);
972
973 void class_export_put(struct obd_export *exp)
974 {
975         LASSERT(exp != NULL);
976         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
977         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
978                atomic_read(&exp->exp_refcount) - 1);
979
980         if (atomic_dec_and_test(&exp->exp_refcount)) {
981                 struct obd_device *obd = exp->exp_obd;
982
983                 CDEBUG(D_IOCTL, "final put %p/%s\n",
984                        exp, exp->exp_client_uuid.uuid);
985
986                 /* release nid stat refererence */
987                 lprocfs_exp_cleanup(exp);
988
989                 if (exp == obd->obd_self_export) {
990                         /* self export should be destroyed without
991                          * zombie thread as it doesn't hold a
992                          * reference to obd and doesn't hold any
993                          * resources */
994                         class_export_destroy(exp);
995                         /* self export is destroyed, no class
996                          * references exist and it is safe to free
997                          * obd */
998                         class_free_dev(obd);
999                 } else {
1000                         LASSERT(!list_empty(&exp->exp_obd_chain));
1001                         obd_zombie_export_add(exp);
1002                 }
1003
1004         }
1005 }
1006 EXPORT_SYMBOL(class_export_put);
1007 /* Creates a new export, adds it to the hash table, and returns a
1008  * pointer to it. The refcount is 2: one for the hash reference, and
1009  * one for the pointer returned by this function. */
1010 struct obd_export *__class_new_export(struct obd_device *obd,
1011                                       struct obd_uuid *cluuid, bool is_self)
1012 {
1013         struct obd_export *export;
1014         struct cfs_hash *hash = NULL;
1015         int rc = 0;
1016         ENTRY;
1017
1018         OBD_ALLOC_PTR(export);
1019         if (!export)
1020                 return ERR_PTR(-ENOMEM);
1021
1022         export->exp_conn_cnt = 0;
1023         export->exp_lock_hash = NULL;
1024         export->exp_flock_hash = NULL;
1025         /* 2 = class_handle_hash + last */
1026         atomic_set(&export->exp_refcount, 2);
1027         atomic_set(&export->exp_rpc_count, 0);
1028         atomic_set(&export->exp_cb_count, 0);
1029         atomic_set(&export->exp_locks_count, 0);
1030 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1031         INIT_LIST_HEAD(&export->exp_locks_list);
1032         spin_lock_init(&export->exp_locks_list_guard);
1033 #endif
1034         atomic_set(&export->exp_replay_count, 0);
1035         export->exp_obd = obd;
1036         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1037         spin_lock_init(&export->exp_uncommitted_replies_lock);
1038         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1039         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1040         INIT_LIST_HEAD(&export->exp_handle.h_link);
1041         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1042         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1043         class_handle_hash(&export->exp_handle, &export_handle_ops);
1044         export->exp_last_request_time = ktime_get_real_seconds();
1045         spin_lock_init(&export->exp_lock);
1046         spin_lock_init(&export->exp_rpc_lock);
1047         INIT_HLIST_NODE(&export->exp_uuid_hash);
1048         INIT_HLIST_NODE(&export->exp_nid_hash);
1049         INIT_HLIST_NODE(&export->exp_gen_hash);
1050         spin_lock_init(&export->exp_bl_list_lock);
1051         INIT_LIST_HEAD(&export->exp_bl_list);
1052         INIT_LIST_HEAD(&export->exp_stale_list);
1053
1054         export->exp_sp_peer = LUSTRE_SP_ANY;
1055         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1056         export->exp_client_uuid = *cluuid;
1057         obd_init_export(export);
1058
1059         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1060                 spin_lock(&obd->obd_dev_lock);
1061                 /* shouldn't happen, but might race */
1062                 if (obd->obd_stopping)
1063                         GOTO(exit_unlock, rc = -ENODEV);
1064
1065                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1066                 if (hash == NULL)
1067                         GOTO(exit_unlock, rc = -ENODEV);
1068                 spin_unlock(&obd->obd_dev_lock);
1069
1070                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1071                 if (rc != 0) {
1072                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1073                                       obd->obd_name, cluuid->uuid, rc);
1074                         GOTO(exit_err, rc = -EALREADY);
1075                 }
1076         }
1077
1078         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1079         spin_lock(&obd->obd_dev_lock);
1080         if (obd->obd_stopping) {
1081                 if (hash)
1082                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1083                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1084         }
1085
1086         if (!is_self) {
1087                 class_incref(obd, "export", export);
1088                 list_add_tail(&export->exp_obd_chain_timed,
1089                               &obd->obd_exports_timed);
1090                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1091                 obd->obd_num_exports++;
1092         } else {
1093                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1094                 INIT_LIST_HEAD(&export->exp_obd_chain);
1095         }
1096         spin_unlock(&obd->obd_dev_lock);
1097         if (hash)
1098                 cfs_hash_putref(hash);
1099         RETURN(export);
1100
1101 exit_unlock:
1102         spin_unlock(&obd->obd_dev_lock);
1103 exit_err:
1104         if (hash)
1105                 cfs_hash_putref(hash);
1106         class_handle_unhash(&export->exp_handle);
1107         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1108         obd_destroy_export(export);
1109         OBD_FREE_PTR(export);
1110         return ERR_PTR(rc);
1111 }
1112
1113 struct obd_export *class_new_export(struct obd_device *obd,
1114                                     struct obd_uuid *uuid)
1115 {
1116         return __class_new_export(obd, uuid, false);
1117 }
1118 EXPORT_SYMBOL(class_new_export);
1119
1120 struct obd_export *class_new_export_self(struct obd_device *obd,
1121                                          struct obd_uuid *uuid)
1122 {
1123         return __class_new_export(obd, uuid, true);
1124 }
1125
1126 void class_unlink_export(struct obd_export *exp)
1127 {
1128         class_handle_unhash(&exp->exp_handle);
1129
1130         if (exp->exp_obd->obd_self_export == exp) {
1131                 class_export_put(exp);
1132                 return;
1133         }
1134
1135         spin_lock(&exp->exp_obd->obd_dev_lock);
1136         /* delete an uuid-export hashitem from hashtables */
1137         if (!hlist_unhashed(&exp->exp_uuid_hash))
1138                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1139                              &exp->exp_client_uuid,
1140                              &exp->exp_uuid_hash);
1141
1142 #ifdef HAVE_SERVER_SUPPORT
1143         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1144                 struct tg_export_data   *ted = &exp->exp_target_data;
1145                 struct cfs_hash         *hash;
1146
1147                 /* Because obd_gen_hash will not be released until
1148                  * class_cleanup(), so hash should never be NULL here */
1149                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1150                 LASSERT(hash != NULL);
1151                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1152                              &exp->exp_gen_hash);
1153                 cfs_hash_putref(hash);
1154         }
1155 #endif /* HAVE_SERVER_SUPPORT */
1156
1157         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1158         list_del_init(&exp->exp_obd_chain_timed);
1159         exp->exp_obd->obd_num_exports--;
1160         spin_unlock(&exp->exp_obd->obd_dev_lock);
1161         atomic_inc(&obd_stale_export_num);
1162
1163         /* A reference is kept by obd_stale_exports list */
1164         obd_stale_export_put(exp);
1165 }
1166 EXPORT_SYMBOL(class_unlink_export);
1167
1168 /* Import management functions */
1169 static void class_import_destroy(struct obd_import *imp)
1170 {
1171         ENTRY;
1172
1173         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1174                 imp->imp_obd->obd_name);
1175
1176         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1177
1178         ptlrpc_put_connection_superhack(imp->imp_connection);
1179
1180         while (!list_empty(&imp->imp_conn_list)) {
1181                 struct obd_import_conn *imp_conn;
1182
1183                 imp_conn = list_entry(imp->imp_conn_list.next,
1184                                       struct obd_import_conn, oic_item);
1185                 list_del_init(&imp_conn->oic_item);
1186                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1187                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1188         }
1189
1190         LASSERT(imp->imp_sec == NULL);
1191         class_decref(imp->imp_obd, "import", imp);
1192         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1193         EXIT;
1194 }
1195
1196 static void import_handle_addref(void *import)
1197 {
1198         class_import_get(import);
1199 }
1200
1201 static struct portals_handle_ops import_handle_ops = {
1202         .hop_addref = import_handle_addref,
1203         .hop_free   = NULL,
1204 };
1205
1206 struct obd_import *class_import_get(struct obd_import *import)
1207 {
1208         atomic_inc(&import->imp_refcount);
1209         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1210                atomic_read(&import->imp_refcount),
1211                import->imp_obd->obd_name);
1212         return import;
1213 }
1214 EXPORT_SYMBOL(class_import_get);
1215
1216 void class_import_put(struct obd_import *imp)
1217 {
1218         ENTRY;
1219
1220         LASSERT(list_empty(&imp->imp_zombie_chain));
1221         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1222
1223         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1224                atomic_read(&imp->imp_refcount) - 1,
1225                imp->imp_obd->obd_name);
1226
1227         if (atomic_dec_and_test(&imp->imp_refcount)) {
1228                 CDEBUG(D_INFO, "final put import %p\n", imp);
1229                 obd_zombie_import_add(imp);
1230         }
1231
1232         /* catch possible import put race */
1233         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1234         EXIT;
1235 }
1236 EXPORT_SYMBOL(class_import_put);
1237
1238 static void init_imp_at(struct imp_at *at) {
1239         int i;
1240         at_init(&at->iat_net_latency, 0, 0);
1241         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1242                 /* max service estimates are tracked on the server side, so
1243                    don't use the AT history here, just use the last reported
1244                    val. (But keep hist for proc histogram, worst_ever) */
1245                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1246                         AT_FLG_NOHIST);
1247         }
1248 }
1249
1250 struct obd_import *class_new_import(struct obd_device *obd)
1251 {
1252         struct obd_import *imp;
1253         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1254
1255         OBD_ALLOC(imp, sizeof(*imp));
1256         if (imp == NULL)
1257                 return NULL;
1258
1259         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1260         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1261         INIT_LIST_HEAD(&imp->imp_replay_list);
1262         INIT_LIST_HEAD(&imp->imp_sending_list);
1263         INIT_LIST_HEAD(&imp->imp_delayed_list);
1264         INIT_LIST_HEAD(&imp->imp_committed_list);
1265         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1266         imp->imp_known_replied_xid = 0;
1267         imp->imp_replay_cursor = &imp->imp_committed_list;
1268         spin_lock_init(&imp->imp_lock);
1269         imp->imp_last_success_conn = 0;
1270         imp->imp_state = LUSTRE_IMP_NEW;
1271         imp->imp_obd = class_incref(obd, "import", imp);
1272         mutex_init(&imp->imp_sec_mutex);
1273         init_waitqueue_head(&imp->imp_recovery_waitq);
1274
1275         if (curr_pid_ns->child_reaper)
1276                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1277         else
1278                 imp->imp_sec_refpid = 1;
1279
1280         atomic_set(&imp->imp_refcount, 2);
1281         atomic_set(&imp->imp_unregistering, 0);
1282         atomic_set(&imp->imp_inflight, 0);
1283         atomic_set(&imp->imp_replay_inflight, 0);
1284         atomic_set(&imp->imp_inval_count, 0);
1285         INIT_LIST_HEAD(&imp->imp_conn_list);
1286         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1287         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1288         init_imp_at(&imp->imp_at);
1289
1290         /* the default magic is V2, will be used in connect RPC, and
1291          * then adjusted according to the flags in request/reply. */
1292         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1293
1294         return imp;
1295 }
1296 EXPORT_SYMBOL(class_new_import);
1297
1298 void class_destroy_import(struct obd_import *import)
1299 {
1300         LASSERT(import != NULL);
1301         LASSERT(import != LP_POISON);
1302
1303         class_handle_unhash(&import->imp_handle);
1304
1305         spin_lock(&import->imp_lock);
1306         import->imp_generation++;
1307         spin_unlock(&import->imp_lock);
1308         class_import_put(import);
1309 }
1310 EXPORT_SYMBOL(class_destroy_import);
1311
1312 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1313
1314 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1315 {
1316         spin_lock(&exp->exp_locks_list_guard);
1317
1318         LASSERT(lock->l_exp_refs_nr >= 0);
1319
1320         if (lock->l_exp_refs_target != NULL &&
1321             lock->l_exp_refs_target != exp) {
1322                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1323                               exp, lock, lock->l_exp_refs_target);
1324         }
1325         if ((lock->l_exp_refs_nr ++) == 0) {
1326                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1327                 lock->l_exp_refs_target = exp;
1328         }
1329         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1330                lock, exp, lock->l_exp_refs_nr);
1331         spin_unlock(&exp->exp_locks_list_guard);
1332 }
1333 EXPORT_SYMBOL(__class_export_add_lock_ref);
1334
1335 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1336 {
1337         spin_lock(&exp->exp_locks_list_guard);
1338         LASSERT(lock->l_exp_refs_nr > 0);
1339         if (lock->l_exp_refs_target != exp) {
1340                 LCONSOLE_WARN("lock %p, "
1341                               "mismatching export pointers: %p, %p\n",
1342                               lock, lock->l_exp_refs_target, exp);
1343         }
1344         if (-- lock->l_exp_refs_nr == 0) {
1345                 list_del_init(&lock->l_exp_refs_link);
1346                 lock->l_exp_refs_target = NULL;
1347         }
1348         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1349                lock, exp, lock->l_exp_refs_nr);
1350         spin_unlock(&exp->exp_locks_list_guard);
1351 }
1352 EXPORT_SYMBOL(__class_export_del_lock_ref);
1353 #endif
1354
1355 /* A connection defines an export context in which preallocation can
1356    be managed. This releases the export pointer reference, and returns
1357    the export handle, so the export refcount is 1 when this function
1358    returns. */
1359 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1360                   struct obd_uuid *cluuid)
1361 {
1362         struct obd_export *export;
1363         LASSERT(conn != NULL);
1364         LASSERT(obd != NULL);
1365         LASSERT(cluuid != NULL);
1366         ENTRY;
1367
1368         export = class_new_export(obd, cluuid);
1369         if (IS_ERR(export))
1370                 RETURN(PTR_ERR(export));
1371
1372         conn->cookie = export->exp_handle.h_cookie;
1373         class_export_put(export);
1374
1375         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1376                cluuid->uuid, conn->cookie);
1377         RETURN(0);
1378 }
1379 EXPORT_SYMBOL(class_connect);
1380
1381 /* if export is involved in recovery then clean up related things */
1382 static void class_export_recovery_cleanup(struct obd_export *exp)
1383 {
1384         struct obd_device *obd = exp->exp_obd;
1385
1386         spin_lock(&obd->obd_recovery_task_lock);
1387         if (obd->obd_recovering) {
1388                 if (exp->exp_in_recovery) {
1389                         spin_lock(&exp->exp_lock);
1390                         exp->exp_in_recovery = 0;
1391                         spin_unlock(&exp->exp_lock);
1392                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1393                         atomic_dec(&obd->obd_connected_clients);
1394                 }
1395
1396                 /* if called during recovery then should update
1397                  * obd_stale_clients counter,
1398                  * lightweight exports are not counted */
1399                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1400                         exp->exp_obd->obd_stale_clients++;
1401         }
1402         spin_unlock(&obd->obd_recovery_task_lock);
1403
1404         spin_lock(&exp->exp_lock);
1405         /** Cleanup req replay fields */
1406         if (exp->exp_req_replay_needed) {
1407                 exp->exp_req_replay_needed = 0;
1408
1409                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1410                 atomic_dec(&obd->obd_req_replay_clients);
1411         }
1412
1413         /** Cleanup lock replay data */
1414         if (exp->exp_lock_replay_needed) {
1415                 exp->exp_lock_replay_needed = 0;
1416
1417                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1418                 atomic_dec(&obd->obd_lock_replay_clients);
1419         }
1420         spin_unlock(&exp->exp_lock);
1421 }
1422
1423 /* This function removes 1-3 references from the export:
1424  * 1 - for export pointer passed
1425  * and if disconnect really need
1426  * 2 - removing from hash
1427  * 3 - in client_unlink_export
1428  * The export pointer passed to this function can destroyed */
1429 int class_disconnect(struct obd_export *export)
1430 {
1431         int already_disconnected;
1432         ENTRY;
1433
1434         if (export == NULL) {
1435                 CWARN("attempting to free NULL export %p\n", export);
1436                 RETURN(-EINVAL);
1437         }
1438
1439         spin_lock(&export->exp_lock);
1440         already_disconnected = export->exp_disconnected;
1441         export->exp_disconnected = 1;
1442         /*  We hold references of export for uuid hash
1443          *  and nid_hash and export link at least. So
1444          *  it is safe to call cfs_hash_del in there.  */
1445         if (!hlist_unhashed(&export->exp_nid_hash))
1446                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1447                              &export->exp_connection->c_peer.nid,
1448                              &export->exp_nid_hash);
1449         spin_unlock(&export->exp_lock);
1450
1451         /* class_cleanup(), abort_recovery(), and class_fail_export()
1452          * all end up in here, and if any of them race we shouldn't
1453          * call extra class_export_puts(). */
1454         if (already_disconnected) {
1455                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1456                 GOTO(no_disconn, already_disconnected);
1457         }
1458
1459         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1460                export->exp_handle.h_cookie);
1461
1462         class_export_recovery_cleanup(export);
1463         class_unlink_export(export);
1464 no_disconn:
1465         class_export_put(export);
1466         RETURN(0);
1467 }
1468 EXPORT_SYMBOL(class_disconnect);
1469
1470 /* Return non-zero for a fully connected export */
1471 int class_connected_export(struct obd_export *exp)
1472 {
1473         int connected = 0;
1474
1475         if (exp) {
1476                 spin_lock(&exp->exp_lock);
1477                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1478                 spin_unlock(&exp->exp_lock);
1479         }
1480         return connected;
1481 }
1482 EXPORT_SYMBOL(class_connected_export);
1483
1484 static void class_disconnect_export_list(struct list_head *list,
1485                                          enum obd_option flags)
1486 {
1487         int rc;
1488         struct obd_export *exp;
1489         ENTRY;
1490
1491         /* It's possible that an export may disconnect itself, but
1492          * nothing else will be added to this list. */
1493         while (!list_empty(list)) {
1494                 exp = list_entry(list->next, struct obd_export,
1495                                  exp_obd_chain);
1496                 /* need for safe call CDEBUG after obd_disconnect */
1497                 class_export_get(exp);
1498
1499                 spin_lock(&exp->exp_lock);
1500                 exp->exp_flags = flags;
1501                 spin_unlock(&exp->exp_lock);
1502
1503                 if (obd_uuid_equals(&exp->exp_client_uuid,
1504                                     &exp->exp_obd->obd_uuid)) {
1505                         CDEBUG(D_HA,
1506                                "exp %p export uuid == obd uuid, don't discon\n",
1507                                exp);
1508                         /* Need to delete this now so we don't end up pointing
1509                          * to work_list later when this export is cleaned up. */
1510                         list_del_init(&exp->exp_obd_chain);
1511                         class_export_put(exp);
1512                         continue;
1513                 }
1514
1515                 class_export_get(exp);
1516                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1517                        "last request at %lld\n",
1518                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1519                        exp, exp->exp_last_request_time);
1520                 /* release one export reference anyway */
1521                 rc = obd_disconnect(exp);
1522
1523                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1524                        obd_export_nid2str(exp), exp, rc);
1525                 class_export_put(exp);
1526         }
1527         EXIT;
1528 }
1529
1530 void class_disconnect_exports(struct obd_device *obd)
1531 {
1532         struct list_head work_list;
1533         ENTRY;
1534
1535         /* Move all of the exports from obd_exports to a work list, en masse. */
1536         INIT_LIST_HEAD(&work_list);
1537         spin_lock(&obd->obd_dev_lock);
1538         list_splice_init(&obd->obd_exports, &work_list);
1539         list_splice_init(&obd->obd_delayed_exports, &work_list);
1540         spin_unlock(&obd->obd_dev_lock);
1541
1542         if (!list_empty(&work_list)) {
1543                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1544                        "disconnecting them\n", obd->obd_minor, obd);
1545                 class_disconnect_export_list(&work_list,
1546                                              exp_flags_from_obd(obd));
1547         } else
1548                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1549                        obd->obd_minor, obd);
1550         EXIT;
1551 }
1552 EXPORT_SYMBOL(class_disconnect_exports);
1553
1554 /* Remove exports that have not completed recovery.
1555  */
1556 void class_disconnect_stale_exports(struct obd_device *obd,
1557                                     int (*test_export)(struct obd_export *))
1558 {
1559         struct list_head work_list;
1560         struct obd_export *exp, *n;
1561         int evicted = 0;
1562         ENTRY;
1563
1564         INIT_LIST_HEAD(&work_list);
1565         spin_lock(&obd->obd_dev_lock);
1566         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1567                                  exp_obd_chain) {
1568                 /* don't count self-export as client */
1569                 if (obd_uuid_equals(&exp->exp_client_uuid,
1570                                     &exp->exp_obd->obd_uuid))
1571                         continue;
1572
1573                 /* don't evict clients which have no slot in last_rcvd
1574                  * (e.g. lightweight connection) */
1575                 if (exp->exp_target_data.ted_lr_idx == -1)
1576                         continue;
1577
1578                 spin_lock(&exp->exp_lock);
1579                 if (exp->exp_failed || test_export(exp)) {
1580                         spin_unlock(&exp->exp_lock);
1581                         continue;
1582                 }
1583                 exp->exp_failed = 1;
1584                 spin_unlock(&exp->exp_lock);
1585
1586                 list_move(&exp->exp_obd_chain, &work_list);
1587                 evicted++;
1588                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1589                        obd->obd_name, exp->exp_client_uuid.uuid,
1590                        exp->exp_connection == NULL ? "<unknown>" :
1591                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1592                 print_export_data(exp, "EVICTING", 0, D_HA);
1593         }
1594         spin_unlock(&obd->obd_dev_lock);
1595
1596         if (evicted)
1597                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1598                               obd->obd_name, evicted);
1599
1600         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1601                                                  OBD_OPT_ABORT_RECOV);
1602         EXIT;
1603 }
1604 EXPORT_SYMBOL(class_disconnect_stale_exports);
1605
1606 void class_fail_export(struct obd_export *exp)
1607 {
1608         int rc, already_failed;
1609
1610         spin_lock(&exp->exp_lock);
1611         already_failed = exp->exp_failed;
1612         exp->exp_failed = 1;
1613         spin_unlock(&exp->exp_lock);
1614
1615         if (already_failed) {
1616                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1617                        exp, exp->exp_client_uuid.uuid);
1618                 return;
1619         }
1620
1621         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1622                exp, exp->exp_client_uuid.uuid);
1623
1624         if (obd_dump_on_timeout)
1625                 libcfs_debug_dumplog();
1626
1627         /* need for safe call CDEBUG after obd_disconnect */
1628         class_export_get(exp);
1629
1630         /* Most callers into obd_disconnect are removing their own reference
1631          * (request, for example) in addition to the one from the hash table.
1632          * We don't have such a reference here, so make one. */
1633         class_export_get(exp);
1634         rc = obd_disconnect(exp);
1635         if (rc)
1636                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1637         else
1638                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1639                        exp, exp->exp_client_uuid.uuid);
1640         class_export_put(exp);
1641 }
1642 EXPORT_SYMBOL(class_fail_export);
1643
1644 char *obd_export_nid2str(struct obd_export *exp)
1645 {
1646         if (exp->exp_connection != NULL)
1647                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1648
1649         return "(no nid)";
1650 }
1651 EXPORT_SYMBOL(obd_export_nid2str);
1652
1653 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1654 {
1655         struct cfs_hash *nid_hash;
1656         struct obd_export *doomed_exp = NULL;
1657         int exports_evicted = 0;
1658
1659         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1660
1661         spin_lock(&obd->obd_dev_lock);
1662         /* umount has run already, so evict thread should leave
1663          * its task to umount thread now */
1664         if (obd->obd_stopping) {
1665                 spin_unlock(&obd->obd_dev_lock);
1666                 return exports_evicted;
1667         }
1668         nid_hash = obd->obd_nid_hash;
1669         cfs_hash_getref(nid_hash);
1670         spin_unlock(&obd->obd_dev_lock);
1671
1672         do {
1673                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1674                 if (doomed_exp == NULL)
1675                         break;
1676
1677                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1678                          "nid %s found, wanted nid %s, requested nid %s\n",
1679                          obd_export_nid2str(doomed_exp),
1680                          libcfs_nid2str(nid_key), nid);
1681                 LASSERTF(doomed_exp != obd->obd_self_export,
1682                          "self-export is hashed by NID?\n");
1683                 exports_evicted++;
1684                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1685                               "request\n", obd->obd_name,
1686                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1687                               obd_export_nid2str(doomed_exp));
1688                 class_fail_export(doomed_exp);
1689                 class_export_put(doomed_exp);
1690         } while (1);
1691
1692         cfs_hash_putref(nid_hash);
1693
1694         if (!exports_evicted)
1695                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1696                        obd->obd_name, nid);
1697         return exports_evicted;
1698 }
1699 EXPORT_SYMBOL(obd_export_evict_by_nid);
1700
1701 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1702 {
1703         struct cfs_hash *uuid_hash;
1704         struct obd_export *doomed_exp = NULL;
1705         struct obd_uuid doomed_uuid;
1706         int exports_evicted = 0;
1707
1708         spin_lock(&obd->obd_dev_lock);
1709         if (obd->obd_stopping) {
1710                 spin_unlock(&obd->obd_dev_lock);
1711                 return exports_evicted;
1712         }
1713         uuid_hash = obd->obd_uuid_hash;
1714         cfs_hash_getref(uuid_hash);
1715         spin_unlock(&obd->obd_dev_lock);
1716
1717         obd_str2uuid(&doomed_uuid, uuid);
1718         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1719                 CERROR("%s: can't evict myself\n", obd->obd_name);
1720                 cfs_hash_putref(uuid_hash);
1721                 return exports_evicted;
1722         }
1723
1724         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1725
1726         if (doomed_exp == NULL) {
1727                 CERROR("%s: can't disconnect %s: no exports found\n",
1728                        obd->obd_name, uuid);
1729         } else {
1730                 CWARN("%s: evicting %s at adminstrative request\n",
1731                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1732                 class_fail_export(doomed_exp);
1733                 class_export_put(doomed_exp);
1734                 exports_evicted++;
1735         }
1736         cfs_hash_putref(uuid_hash);
1737
1738         return exports_evicted;
1739 }
1740
1741 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1742 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1743 EXPORT_SYMBOL(class_export_dump_hook);
1744 #endif
1745
1746 static void print_export_data(struct obd_export *exp, const char *status,
1747                               int locks, int debug_level)
1748 {
1749         struct ptlrpc_reply_state *rs;
1750         struct ptlrpc_reply_state *first_reply = NULL;
1751         int nreplies = 0;
1752
1753         spin_lock(&exp->exp_lock);
1754         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1755                             rs_exp_list) {
1756                 if (nreplies == 0)
1757                         first_reply = rs;
1758                 nreplies++;
1759         }
1760         spin_unlock(&exp->exp_lock);
1761
1762         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1763                "%p %s %llu stale:%d\n",
1764                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1765                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1766                atomic_read(&exp->exp_rpc_count),
1767                atomic_read(&exp->exp_cb_count),
1768                atomic_read(&exp->exp_locks_count),
1769                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1770                nreplies, first_reply, nreplies > 3 ? "..." : "",
1771                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1772 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1773         if (locks && class_export_dump_hook != NULL)
1774                 class_export_dump_hook(exp);
1775 #endif
1776 }
1777
1778 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1779 {
1780         struct obd_export *exp;
1781
1782         spin_lock(&obd->obd_dev_lock);
1783         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1784                 print_export_data(exp, "ACTIVE", locks, debug_level);
1785         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1786                 print_export_data(exp, "UNLINKED", locks, debug_level);
1787         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1788                 print_export_data(exp, "DELAYED", locks, debug_level);
1789         spin_unlock(&obd->obd_dev_lock);
1790         spin_lock(&obd_zombie_impexp_lock);
1791         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1792                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1793         spin_unlock(&obd_zombie_impexp_lock);
1794 }
1795
1796 void obd_exports_barrier(struct obd_device *obd)
1797 {
1798         int waited = 2;
1799         LASSERT(list_empty(&obd->obd_exports));
1800         spin_lock(&obd->obd_dev_lock);
1801         while (!list_empty(&obd->obd_unlinked_exports)) {
1802                 spin_unlock(&obd->obd_dev_lock);
1803                 set_current_state(TASK_UNINTERRUPTIBLE);
1804                 schedule_timeout(cfs_time_seconds(waited));
1805                 if (waited > 5 && is_power_of_2(waited)) {
1806                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1807                                       "more than %d seconds. "
1808                                       "The obd refcount = %d. Is it stuck?\n",
1809                                       obd->obd_name, waited,
1810                                       atomic_read(&obd->obd_refcount));
1811                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1812                 }
1813                 waited *= 2;
1814                 spin_lock(&obd->obd_dev_lock);
1815         }
1816         spin_unlock(&obd->obd_dev_lock);
1817 }
1818 EXPORT_SYMBOL(obd_exports_barrier);
1819
1820 /* Total amount of zombies to be destroyed */
1821 static int zombies_count = 0;
1822
1823 /**
1824  * kill zombie imports and exports
1825  */
1826 void obd_zombie_impexp_cull(void)
1827 {
1828         struct obd_import *import;
1829         struct obd_export *export;
1830         ENTRY;
1831
1832         do {
1833                 spin_lock(&obd_zombie_impexp_lock);
1834
1835                 import = NULL;
1836                 if (!list_empty(&obd_zombie_imports)) {
1837                         import = list_entry(obd_zombie_imports.next,
1838                                             struct obd_import,
1839                                             imp_zombie_chain);
1840                         list_del_init(&import->imp_zombie_chain);
1841                 }
1842
1843                 export = NULL;
1844                 if (!list_empty(&obd_zombie_exports)) {
1845                         export = list_entry(obd_zombie_exports.next,
1846                                             struct obd_export,
1847                                             exp_obd_chain);
1848                         list_del_init(&export->exp_obd_chain);
1849                 }
1850
1851                 spin_unlock(&obd_zombie_impexp_lock);
1852
1853                 if (import != NULL) {
1854                         class_import_destroy(import);
1855                         spin_lock(&obd_zombie_impexp_lock);
1856                         zombies_count--;
1857                         spin_unlock(&obd_zombie_impexp_lock);
1858                 }
1859
1860                 if (export != NULL) {
1861                         class_export_destroy(export);
1862                         spin_lock(&obd_zombie_impexp_lock);
1863                         zombies_count--;
1864                         spin_unlock(&obd_zombie_impexp_lock);
1865                 }
1866
1867                 cond_resched();
1868         } while (import != NULL || export != NULL);
1869         EXIT;
1870 }
1871
1872 static DECLARE_COMPLETION(obd_zombie_start);
1873 static DECLARE_COMPLETION(obd_zombie_stop);
1874 static unsigned long obd_zombie_flags;
1875 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1876 static pid_t obd_zombie_pid;
1877
1878 enum {
1879         OBD_ZOMBIE_STOP         = 0x0001,
1880 };
1881
1882 /**
1883  * check for work for kill zombie import/export thread.
1884  */
1885 static int obd_zombie_impexp_check(void *arg)
1886 {
1887         int rc;
1888
1889         spin_lock(&obd_zombie_impexp_lock);
1890         rc = (zombies_count == 0) &&
1891              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1892         spin_unlock(&obd_zombie_impexp_lock);
1893
1894         RETURN(rc);
1895 }
1896
1897 /**
1898  * Add export to the obd_zombe thread and notify it.
1899  */
1900 static void obd_zombie_export_add(struct obd_export *exp) {
1901         atomic_dec(&obd_stale_export_num);
1902         spin_lock(&exp->exp_obd->obd_dev_lock);
1903         LASSERT(!list_empty(&exp->exp_obd_chain));
1904         list_del_init(&exp->exp_obd_chain);
1905         spin_unlock(&exp->exp_obd->obd_dev_lock);
1906         spin_lock(&obd_zombie_impexp_lock);
1907         zombies_count++;
1908         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1909         spin_unlock(&obd_zombie_impexp_lock);
1910
1911         obd_zombie_impexp_notify();
1912 }
1913
1914 /**
1915  * Add import to the obd_zombe thread and notify it.
1916  */
1917 static void obd_zombie_import_add(struct obd_import *imp) {
1918         LASSERT(imp->imp_sec == NULL);
1919         spin_lock(&obd_zombie_impexp_lock);
1920         LASSERT(list_empty(&imp->imp_zombie_chain));
1921         zombies_count++;
1922         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1923         spin_unlock(&obd_zombie_impexp_lock);
1924
1925         obd_zombie_impexp_notify();
1926 }
1927
1928 /**
1929  * notify import/export destroy thread about new zombie.
1930  */
1931 static void obd_zombie_impexp_notify(void)
1932 {
1933         /*
1934          * Make sure obd_zomebie_impexp_thread get this notification.
1935          * It is possible this signal only get by obd_zombie_barrier, and
1936          * barrier gulps this notification and sleeps away and hangs ensues
1937          */
1938         wake_up_all(&obd_zombie_waitq);
1939 }
1940
1941 /**
1942  * check whether obd_zombie is idle
1943  */
1944 static int obd_zombie_is_idle(void)
1945 {
1946         int rc;
1947
1948         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1949         spin_lock(&obd_zombie_impexp_lock);
1950         rc = (zombies_count == 0);
1951         spin_unlock(&obd_zombie_impexp_lock);
1952         return rc;
1953 }
1954
1955 /**
1956  * wait when obd_zombie import/export queues become empty
1957  */
1958 void obd_zombie_barrier(void)
1959 {
1960         struct l_wait_info lwi = { 0 };
1961
1962         if (obd_zombie_pid == current_pid())
1963                 /* don't wait for myself */
1964                 return;
1965         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1966 }
1967 EXPORT_SYMBOL(obd_zombie_barrier);
1968
1969
1970 struct obd_export *obd_stale_export_get(void)
1971 {
1972         struct obd_export *exp = NULL;
1973         ENTRY;
1974
1975         spin_lock(&obd_stale_export_lock);
1976         if (!list_empty(&obd_stale_exports)) {
1977                 exp = list_entry(obd_stale_exports.next,
1978                                  struct obd_export, exp_stale_list);
1979                 list_del_init(&exp->exp_stale_list);
1980         }
1981         spin_unlock(&obd_stale_export_lock);
1982
1983         if (exp) {
1984                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1985                        atomic_read(&obd_stale_export_num));
1986         }
1987         RETURN(exp);
1988 }
1989 EXPORT_SYMBOL(obd_stale_export_get);
1990
1991 void obd_stale_export_put(struct obd_export *exp)
1992 {
1993         ENTRY;
1994
1995         LASSERT(list_empty(&exp->exp_stale_list));
1996         if (exp->exp_lock_hash &&
1997             atomic_read(&exp->exp_lock_hash->hs_count)) {
1998                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1999                        atomic_read(&obd_stale_export_num));
2000
2001                 spin_lock_bh(&exp->exp_bl_list_lock);
2002                 spin_lock(&obd_stale_export_lock);
2003                 /* Add to the tail if there is no blocked locks,
2004                  * to the head otherwise. */
2005                 if (list_empty(&exp->exp_bl_list))
2006                         list_add_tail(&exp->exp_stale_list,
2007                                       &obd_stale_exports);
2008                 else
2009                         list_add(&exp->exp_stale_list,
2010                                  &obd_stale_exports);
2011
2012                 spin_unlock(&obd_stale_export_lock);
2013                 spin_unlock_bh(&exp->exp_bl_list_lock);
2014         } else {
2015                 class_export_put(exp);
2016         }
2017         EXIT;
2018 }
2019 EXPORT_SYMBOL(obd_stale_export_put);
2020
2021 /**
2022  * Adjust the position of the export in the stale list,
2023  * i.e. move to the head of the list if is needed.
2024  **/
2025 void obd_stale_export_adjust(struct obd_export *exp)
2026 {
2027         LASSERT(exp != NULL);
2028         spin_lock_bh(&exp->exp_bl_list_lock);
2029         spin_lock(&obd_stale_export_lock);
2030
2031         if (!list_empty(&exp->exp_stale_list) &&
2032             !list_empty(&exp->exp_bl_list))
2033                 list_move(&exp->exp_stale_list, &obd_stale_exports);
2034
2035         spin_unlock(&obd_stale_export_lock);
2036         spin_unlock_bh(&exp->exp_bl_list_lock);
2037 }
2038 EXPORT_SYMBOL(obd_stale_export_adjust);
2039
2040 /**
2041  * destroy zombie export/import thread.
2042  */
2043 static int obd_zombie_impexp_thread(void *unused)
2044 {
2045         unshare_fs_struct();
2046         complete(&obd_zombie_start);
2047
2048         obd_zombie_pid = current_pid();
2049
2050         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
2051                 struct l_wait_info lwi = { 0 };
2052
2053                 l_wait_event(obd_zombie_waitq,
2054                              !obd_zombie_impexp_check(NULL), &lwi);
2055                 obd_zombie_impexp_cull();
2056
2057                 /*
2058                  * Notify obd_zombie_barrier callers that queues
2059                  * may be empty.
2060                  */
2061                 wake_up(&obd_zombie_waitq);
2062         }
2063
2064         complete(&obd_zombie_stop);
2065
2066         RETURN(0);
2067 }
2068
2069
2070 /**
2071  * start destroy zombie import/export thread
2072  */
2073 int obd_zombie_impexp_init(void)
2074 {
2075         struct task_struct *task;
2076
2077         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
2078         if (IS_ERR(task))
2079                 RETURN(PTR_ERR(task));
2080
2081         wait_for_completion(&obd_zombie_start);
2082         RETURN(0);
2083 }
2084 /**
2085  * stop destroy zombie import/export thread
2086  */
2087 void obd_zombie_impexp_stop(void)
2088 {
2089         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
2090         obd_zombie_impexp_notify();
2091         wait_for_completion(&obd_zombie_stop);
2092         LASSERT(list_empty(&obd_stale_exports));
2093 }
2094
2095 /***** Kernel-userspace comm helpers *******/
2096
2097 /* Get length of entire message, including header */
2098 int kuc_len(int payload_len)
2099 {
2100         return sizeof(struct kuc_hdr) + payload_len;
2101 }
2102 EXPORT_SYMBOL(kuc_len);
2103
2104 /* Get a pointer to kuc header, given a ptr to the payload
2105  * @param p Pointer to payload area
2106  * @returns Pointer to kuc header
2107  */
2108 struct kuc_hdr * kuc_ptr(void *p)
2109 {
2110         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2111         LASSERT(lh->kuc_magic == KUC_MAGIC);
2112         return lh;
2113 }
2114 EXPORT_SYMBOL(kuc_ptr);
2115
2116 /* Alloc space for a message, and fill in header
2117  * @return Pointer to payload area
2118  */
2119 void *kuc_alloc(int payload_len, int transport, int type)
2120 {
2121         struct kuc_hdr *lh;
2122         int len = kuc_len(payload_len);
2123
2124         OBD_ALLOC(lh, len);
2125         if (lh == NULL)
2126                 return ERR_PTR(-ENOMEM);
2127
2128         lh->kuc_magic = KUC_MAGIC;
2129         lh->kuc_transport = transport;
2130         lh->kuc_msgtype = type;
2131         lh->kuc_msglen = len;
2132
2133         return (void *)(lh + 1);
2134 }
2135 EXPORT_SYMBOL(kuc_alloc);
2136
2137 /* Takes pointer to payload area */
2138 void kuc_free(void *p, int payload_len)
2139 {
2140         struct kuc_hdr *lh = kuc_ptr(p);
2141         OBD_FREE(lh, kuc_len(payload_len));
2142 }
2143 EXPORT_SYMBOL(kuc_free);
2144
2145 struct obd_request_slot_waiter {
2146         struct list_head        orsw_entry;
2147         wait_queue_head_t       orsw_waitq;
2148         bool                    orsw_signaled;
2149 };
2150
2151 static bool obd_request_slot_avail(struct client_obd *cli,
2152                                    struct obd_request_slot_waiter *orsw)
2153 {
2154         bool avail;
2155
2156         spin_lock(&cli->cl_loi_list_lock);
2157         avail = !!list_empty(&orsw->orsw_entry);
2158         spin_unlock(&cli->cl_loi_list_lock);
2159
2160         return avail;
2161 };
2162
2163 /*
2164  * For network flow control, the RPC sponsor needs to acquire a credit
2165  * before sending the RPC. The credits count for a connection is defined
2166  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2167  * the subsequent RPC sponsors need to wait until others released their
2168  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2169  */
2170 int obd_get_request_slot(struct client_obd *cli)
2171 {
2172         struct obd_request_slot_waiter   orsw;
2173         struct l_wait_info               lwi;
2174         int                              rc;
2175
2176         spin_lock(&cli->cl_loi_list_lock);
2177         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
2178                 cli->cl_r_in_flight++;
2179                 spin_unlock(&cli->cl_loi_list_lock);
2180                 return 0;
2181         }
2182
2183         init_waitqueue_head(&orsw.orsw_waitq);
2184         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
2185         orsw.orsw_signaled = false;
2186         spin_unlock(&cli->cl_loi_list_lock);
2187
2188         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2189         rc = l_wait_event(orsw.orsw_waitq,
2190                           obd_request_slot_avail(cli, &orsw) ||
2191                           orsw.orsw_signaled,
2192                           &lwi);
2193
2194         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2195          * freed but other (such as obd_put_request_slot) is using it. */
2196         spin_lock(&cli->cl_loi_list_lock);
2197         if (rc != 0) {
2198                 if (!orsw.orsw_signaled) {
2199                         if (list_empty(&orsw.orsw_entry))
2200                                 cli->cl_r_in_flight--;
2201                         else
2202                                 list_del(&orsw.orsw_entry);
2203                 }
2204         }
2205
2206         if (orsw.orsw_signaled) {
2207                 LASSERT(list_empty(&orsw.orsw_entry));
2208
2209                 rc = -EINTR;
2210         }
2211         spin_unlock(&cli->cl_loi_list_lock);
2212
2213         return rc;
2214 }
2215 EXPORT_SYMBOL(obd_get_request_slot);
2216
2217 void obd_put_request_slot(struct client_obd *cli)
2218 {
2219         struct obd_request_slot_waiter *orsw;
2220
2221         spin_lock(&cli->cl_loi_list_lock);
2222         cli->cl_r_in_flight--;
2223
2224         /* If there is free slot, wakeup the first waiter. */
2225         if (!list_empty(&cli->cl_loi_read_list) &&
2226             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2227                 orsw = list_entry(cli->cl_loi_read_list.next,
2228                                   struct obd_request_slot_waiter, orsw_entry);
2229                 list_del_init(&orsw->orsw_entry);
2230                 cli->cl_r_in_flight++;
2231                 wake_up(&orsw->orsw_waitq);
2232         }
2233         spin_unlock(&cli->cl_loi_list_lock);
2234 }
2235 EXPORT_SYMBOL(obd_put_request_slot);
2236
2237 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2238 {
2239         return cli->cl_max_rpcs_in_flight;
2240 }
2241 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2242
2243 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2244 {
2245         struct obd_request_slot_waiter *orsw;
2246         __u32                           old;
2247         int                             diff;
2248         int                             i;
2249         char                            *typ_name;
2250         int                             rc;
2251
2252         if (max > OBD_MAX_RIF_MAX || max < 1)
2253                 return -ERANGE;
2254
2255         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2256         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2257                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2258                  * strictly lower that max_rpcs_in_flight */
2259                 if (max < 2) {
2260                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2261                                "because it must be higher than "
2262                                "max_mod_rpcs_in_flight value",
2263                                cli->cl_import->imp_obd->obd_name);
2264                         return -ERANGE;
2265                 }
2266                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2267                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2268                         if (rc != 0)
2269                                 return rc;
2270                 }
2271         }
2272
2273         spin_lock(&cli->cl_loi_list_lock);
2274         old = cli->cl_max_rpcs_in_flight;
2275         cli->cl_max_rpcs_in_flight = max;
2276         diff = max - old;
2277
2278         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2279         for (i = 0; i < diff; i++) {
2280                 if (list_empty(&cli->cl_loi_read_list))
2281                         break;
2282
2283                 orsw = list_entry(cli->cl_loi_read_list.next,
2284                                   struct obd_request_slot_waiter, orsw_entry);
2285                 list_del_init(&orsw->orsw_entry);
2286                 cli->cl_r_in_flight++;
2287                 wake_up(&orsw->orsw_waitq);
2288         }
2289         spin_unlock(&cli->cl_loi_list_lock);
2290
2291         return 0;
2292 }
2293 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2294
2295 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2296 {
2297         return cli->cl_max_mod_rpcs_in_flight;
2298 }
2299 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2300
2301 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2302 {
2303         struct obd_connect_data *ocd;
2304         __u16 maxmodrpcs;
2305         __u16 prev;
2306
2307         if (max > OBD_MAX_RIF_MAX || max < 1)
2308                 return -ERANGE;
2309
2310         /* cannot exceed or equal max_rpcs_in_flight */
2311         if (max >= cli->cl_max_rpcs_in_flight) {
2312                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2313                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2314                        cli->cl_import->imp_obd->obd_name,
2315                        max, cli->cl_max_rpcs_in_flight);
2316                 return -ERANGE;
2317         }
2318
2319         /* cannot exceed max modify RPCs in flight supported by the server */
2320         ocd = &cli->cl_import->imp_connect_data;
2321         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2322                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2323         else
2324                 maxmodrpcs = 1;
2325         if (max > maxmodrpcs) {
2326                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2327                        "higher than max_mod_rpcs_per_client value (%hu) "
2328                        "returned by the server at connection\n",
2329                        cli->cl_import->imp_obd->obd_name,
2330                        max, maxmodrpcs);
2331                 return -ERANGE;
2332         }
2333
2334         spin_lock(&cli->cl_mod_rpcs_lock);
2335
2336         prev = cli->cl_max_mod_rpcs_in_flight;
2337         cli->cl_max_mod_rpcs_in_flight = max;
2338
2339         /* wakeup waiters if limit has been increased */
2340         if (cli->cl_max_mod_rpcs_in_flight > prev)
2341                 wake_up(&cli->cl_mod_rpcs_waitq);
2342
2343         spin_unlock(&cli->cl_mod_rpcs_lock);
2344
2345         return 0;
2346 }
2347 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2348
2349
2350 #define pct(a, b) (b ? a * 100 / b : 0)
2351 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2352                                struct seq_file *seq)
2353 {
2354         unsigned long mod_tot = 0, mod_cum;
2355         struct timespec64 now;
2356         int i;
2357
2358         ktime_get_real_ts64(&now);
2359
2360         spin_lock(&cli->cl_mod_rpcs_lock);
2361
2362         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2363                    (s64)now.tv_sec, now.tv_nsec);
2364         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2365                    cli->cl_mod_rpcs_in_flight);
2366
2367         seq_printf(seq, "\n\t\t\tmodify\n");
2368         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2369
2370         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2371
2372         mod_cum = 0;
2373         for (i = 0; i < OBD_HIST_MAX; i++) {
2374                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2375                 mod_cum += mod;
2376                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2377                            i, mod, pct(mod, mod_tot),
2378                            pct(mod_cum, mod_tot));
2379                 if (mod_cum == mod_tot)
2380                         break;
2381         }
2382
2383         spin_unlock(&cli->cl_mod_rpcs_lock);
2384
2385         return 0;
2386 }
2387 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2388 #undef pct
2389
2390
2391 /* The number of modify RPCs sent in parallel is limited
2392  * because the server has a finite number of slots per client to
2393  * store request result and ensure reply reconstruction when needed.
2394  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2395  * that takes into account server limit and cl_max_rpcs_in_flight
2396  * value.
2397  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2398  * one close request is allowed above the maximum.
2399  */
2400 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2401                                                  bool close_req)
2402 {
2403         bool avail;
2404
2405         /* A slot is available if
2406          * - number of modify RPCs in flight is less than the max
2407          * - it's a close RPC and no other close request is in flight
2408          */
2409         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2410                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2411
2412         return avail;
2413 }
2414
2415 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2416                                          bool close_req)
2417 {
2418         bool avail;
2419
2420         spin_lock(&cli->cl_mod_rpcs_lock);
2421         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2422         spin_unlock(&cli->cl_mod_rpcs_lock);
2423         return avail;
2424 }
2425
2426 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2427 {
2428         if (it != NULL &&
2429             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2430              it->it_op == IT_READDIR ||
2431              (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
2432                         return true;
2433         return false;
2434 }
2435
2436 /* Get a modify RPC slot from the obd client @cli according
2437  * to the kind of operation @opc that is going to be sent
2438  * and the intent @it of the operation if it applies.
2439  * If the maximum number of modify RPCs in flight is reached
2440  * the thread is put to sleep.
2441  * Returns the tag to be set in the request message. Tag 0
2442  * is reserved for non-modifying requests.
2443  */
2444 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2445                            struct lookup_intent *it)
2446 {
2447         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2448         bool                    close_req = false;
2449         __u16                   i, max;
2450
2451         /* read-only metadata RPCs don't consume a slot on MDT
2452          * for reply reconstruction
2453          */
2454         if (obd_skip_mod_rpc_slot(it))
2455                 return 0;
2456
2457         if (opc == MDS_CLOSE)
2458                 close_req = true;
2459
2460         do {
2461                 spin_lock(&cli->cl_mod_rpcs_lock);
2462                 max = cli->cl_max_mod_rpcs_in_flight;
2463                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2464                         /* there is a slot available */
2465                         cli->cl_mod_rpcs_in_flight++;
2466                         if (close_req)
2467                                 cli->cl_close_rpcs_in_flight++;
2468                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2469                                          cli->cl_mod_rpcs_in_flight);
2470                         /* find a free tag */
2471                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2472                                                 max + 1);
2473                         LASSERT(i < OBD_MAX_RIF_MAX);
2474                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2475                         spin_unlock(&cli->cl_mod_rpcs_lock);
2476                         /* tag 0 is reserved for non-modify RPCs */
2477                         return i + 1;
2478                 }
2479                 spin_unlock(&cli->cl_mod_rpcs_lock);
2480
2481                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2482                        "opc %u, max %hu\n",
2483                        cli->cl_import->imp_obd->obd_name, opc, max);
2484
2485                 l_wait_event(cli->cl_mod_rpcs_waitq,
2486                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2487         } while (true);
2488 }
2489 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2490
2491 /* Put a modify RPC slot from the obd client @cli according
2492  * to the kind of operation @opc that has been sent and the
2493  * intent @it of the operation if it applies.
2494  */
2495 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2496                           struct lookup_intent *it, __u16 tag)
2497 {
2498         bool                    close_req = false;
2499
2500         if (obd_skip_mod_rpc_slot(it))
2501                 return;
2502
2503         if (opc == MDS_CLOSE)
2504                 close_req = true;
2505
2506         spin_lock(&cli->cl_mod_rpcs_lock);
2507         cli->cl_mod_rpcs_in_flight--;
2508         if (close_req)
2509                 cli->cl_close_rpcs_in_flight--;
2510         /* release the tag in the bitmap */
2511         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2512         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2513         spin_unlock(&cli->cl_mod_rpcs_lock);
2514         wake_up(&cli->cl_mod_rpcs_waitq);
2515 }
2516 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2517