Whamcloud - gitweb
2bfa78154e28f591329d38ed06b72e874b654c14
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55
56 static struct workqueue_struct *zombie_wq;
57
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks, int debug_level);
62
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         OBD_FREE(kobj, sizeof(*kobj));
164 }
165
166 static struct kobj_type class_ktype = {
167         .sysfs_ops      = &lustre_sysfs_ops,
168         .release        = class_sysfs_release,
169 };
170
171 struct kobject *class_setup_tunables(const char *name)
172 {
173         struct kobject *kobj;
174         int rc;
175
176 #ifdef HAVE_SERVER_SUPPORT
177         kobj = kset_find_obj(lustre_kset, name);
178         if (kobj)
179                 return kobj;
180 #endif
181         OBD_ALLOC(kobj, sizeof(*kobj));
182         if (!kobj)
183                 return ERR_PTR(-ENOMEM);
184
185         kobj->kset = lustre_kset;
186         kobject_init(kobj, &class_ktype);
187         rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
188         if (rc) {
189                 kobject_put(kobj);
190                 return ERR_PTR(rc);
191         }
192         return kobj;
193 }
194 EXPORT_SYMBOL(class_setup_tunables);
195
196 #define CLASS_MAX_NAME 1024
197
198 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
199                         bool enable_proc, struct lprocfs_vars *vars,
200                         const char *name, struct lu_device_type *ldt)
201 {
202         struct obd_type *type;
203 #ifdef HAVE_SERVER_SUPPORT
204         struct qstr dname;
205 #endif /* HAVE_SERVER_SUPPORT */
206         int rc = 0;
207
208         ENTRY;
209         /* sanity check */
210         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
211
212         if (class_search_type(name)) {
213                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
214                 RETURN(-EEXIST);
215         }
216
217         rc = -ENOMEM;
218         OBD_ALLOC(type, sizeof(*type));
219         if (type == NULL)
220                 RETURN(rc);
221
222         OBD_ALLOC_PTR(type->typ_dt_ops);
223         OBD_ALLOC_PTR(type->typ_md_ops);
224         OBD_ALLOC(type->typ_name, strlen(name) + 1);
225
226         if (type->typ_dt_ops == NULL ||
227             type->typ_md_ops == NULL ||
228             type->typ_name == NULL)
229                 GOTO (failed, rc);
230
231         *(type->typ_dt_ops) = *dt_ops;
232         /* md_ops is optional */
233         if (md_ops)
234                 *(type->typ_md_ops) = *md_ops;
235         strcpy(type->typ_name, name);
236         spin_lock_init(&type->obd_type_lock);
237
238 #ifdef CONFIG_PROC_FS
239         if (enable_proc) {
240                 type->typ_procroot = lprocfs_register(type->typ_name,
241                                                       proc_lustre_root,
242                                                       vars, type);
243                 if (IS_ERR(type->typ_procroot)) {
244                         rc = PTR_ERR(type->typ_procroot);
245                         type->typ_procroot = NULL;
246                         GOTO(failed, rc);
247                 }
248         }
249 #endif
250 #ifdef HAVE_SERVER_SUPPORT
251         dname.name = name;
252         dname.len = strlen(dname.name);
253         dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
254                                        dname.len);
255         type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
256         if (type->typ_debugfs_entry) {
257                 dput(type->typ_debugfs_entry);
258                 type->typ_sym_filter = true;
259                 goto dir_exist;
260         }
261 #endif /* HAVE_SERVER_SUPPORT */
262
263         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
264                                                     debugfs_lustre_root,
265                                                     NULL, type);
266         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
267                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
268                                              : -ENOMEM;
269                 type->typ_debugfs_entry = NULL;
270                 GOTO(failed, rc);
271         }
272 #ifdef HAVE_SERVER_SUPPORT
273 dir_exist:
274 #endif
275         type->typ_kobj = class_setup_tunables(type->typ_name);
276         if (IS_ERR(type->typ_kobj))
277                 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
278
279         if (ldt) {
280                 type->typ_lu = ldt;
281                 rc = lu_device_type_init(ldt);
282                 if (rc) {
283                         kobject_put(type->typ_kobj);
284                         GOTO(failed, rc);
285                 }
286         }
287
288         spin_lock(&obd_types_lock);
289         list_add(&type->typ_chain, &obd_types);
290         spin_unlock(&obd_types_lock);
291
292         RETURN(0);
293
294 failed:
295 #ifdef HAVE_SERVER_SUPPORT
296         if (type->typ_sym_filter)
297                 type->typ_debugfs_entry = NULL;
298 #endif
299         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
300                 ldebugfs_remove(&type->typ_debugfs_entry);
301         if (type->typ_name != NULL) {
302 #ifdef CONFIG_PROC_FS
303                 if (type->typ_procroot != NULL)
304                         remove_proc_subtree(type->typ_name, proc_lustre_root);
305 #endif
306                 OBD_FREE(type->typ_name, strlen(name) + 1);
307         }
308         if (type->typ_md_ops != NULL)
309                 OBD_FREE_PTR(type->typ_md_ops);
310         if (type->typ_dt_ops != NULL)
311                 OBD_FREE_PTR(type->typ_dt_ops);
312         OBD_FREE(type, sizeof(*type));
313         RETURN(rc);
314 }
315 EXPORT_SYMBOL(class_register_type);
316
317 int class_unregister_type(const char *name)
318 {
319         struct obd_type *type = class_search_type(name);
320         ENTRY;
321
322         if (!type) {
323                 CERROR("unknown obd type\n");
324                 RETURN(-EINVAL);
325         }
326
327         if (type->typ_refcnt) {
328                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
329                 /* This is a bad situation, let's make the best of it */
330                 /* Remove ops, but leave the name for debugging */
331                 OBD_FREE_PTR(type->typ_dt_ops);
332                 OBD_FREE_PTR(type->typ_md_ops);
333                 RETURN(-EBUSY);
334         }
335
336         kobject_put(type->typ_kobj);
337
338         /* we do not use type->typ_procroot as for compatibility purposes
339          * other modules can share names (i.e. lod can use lov entry). so
340          * we can't reference pointer as it can get invalided when another
341          * module removes the entry */
342 #ifdef CONFIG_PROC_FS
343         if (type->typ_procroot != NULL)
344                 remove_proc_subtree(type->typ_name, proc_lustre_root);
345         if (type->typ_procsym != NULL)
346                 lprocfs_remove(&type->typ_procsym);
347 #endif
348 #ifdef HAVE_SERVER_SUPPORT
349         if (type->typ_sym_filter)
350                 type->typ_debugfs_entry = NULL;
351 #endif
352         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
353                 ldebugfs_remove(&type->typ_debugfs_entry);
354
355         if (type->typ_lu)
356                 lu_device_type_fini(type->typ_lu);
357
358         spin_lock(&obd_types_lock);
359         list_del(&type->typ_chain);
360         spin_unlock(&obd_types_lock);
361         OBD_FREE(type->typ_name, strlen(name) + 1);
362         if (type->typ_dt_ops != NULL)
363                 OBD_FREE_PTR(type->typ_dt_ops);
364         if (type->typ_md_ops != NULL)
365                 OBD_FREE_PTR(type->typ_md_ops);
366         OBD_FREE(type, sizeof(*type));
367         RETURN(0);
368 } /* class_unregister_type */
369 EXPORT_SYMBOL(class_unregister_type);
370
371 /**
372  * Create a new obd device.
373  *
374  * Allocate the new obd_device and initialize it.
375  *
376  * \param[in] type_name obd device type string.
377  * \param[in] name      obd device name.
378  * \param[in] uuid      obd device UUID
379  *
380  * \retval newdev         pointer to created obd_device
381  * \retval ERR_PTR(errno) on error
382  */
383 struct obd_device *class_newdev(const char *type_name, const char *name,
384                                 const char *uuid)
385 {
386         struct obd_device *newdev;
387         struct obd_type *type = NULL;
388         ENTRY;
389
390         if (strlen(name) >= MAX_OBD_NAME) {
391                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
392                 RETURN(ERR_PTR(-EINVAL));
393         }
394
395         type = class_get_type(type_name);
396         if (type == NULL){
397                 CERROR("OBD: unknown type: %s\n", type_name);
398                 RETURN(ERR_PTR(-ENODEV));
399         }
400
401         newdev = obd_device_alloc();
402         if (newdev == NULL) {
403                 class_put_type(type);
404                 RETURN(ERR_PTR(-ENOMEM));
405         }
406         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
407         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
408         newdev->obd_type = type;
409         newdev->obd_minor = -1;
410
411         rwlock_init(&newdev->obd_pool_lock);
412         newdev->obd_pool_limit = 0;
413         newdev->obd_pool_slv = 0;
414
415         INIT_LIST_HEAD(&newdev->obd_exports);
416         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
417         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
418         INIT_LIST_HEAD(&newdev->obd_exports_timed);
419         INIT_LIST_HEAD(&newdev->obd_nid_stats);
420         spin_lock_init(&newdev->obd_nid_lock);
421         spin_lock_init(&newdev->obd_dev_lock);
422         mutex_init(&newdev->obd_dev_mutex);
423         spin_lock_init(&newdev->obd_osfs_lock);
424         /* newdev->obd_osfs_age must be set to a value in the distant
425          * past to guarantee a fresh statfs is fetched on mount. */
426         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
427
428         /* XXX belongs in setup not attach  */
429         init_rwsem(&newdev->obd_observer_link_sem);
430         /* recovery data */
431         spin_lock_init(&newdev->obd_recovery_task_lock);
432         init_waitqueue_head(&newdev->obd_next_transno_waitq);
433         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
434         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
435         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
436         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
437         INIT_LIST_HEAD(&newdev->obd_evict_list);
438         INIT_LIST_HEAD(&newdev->obd_lwp_list);
439
440         llog_group_init(&newdev->obd_olg);
441         /* Detach drops this */
442         atomic_set(&newdev->obd_refcount, 1);
443         lu_ref_init(&newdev->obd_reference);
444         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
445
446         newdev->obd_conn_inprogress = 0;
447
448         strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
449
450         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
451                newdev->obd_name, newdev);
452
453         return newdev;
454 }
455
456 /**
457  * Free obd device.
458  *
459  * \param[in] obd obd_device to be freed
460  *
461  * \retval none
462  */
463 void class_free_dev(struct obd_device *obd)
464 {
465         struct obd_type *obd_type = obd->obd_type;
466
467         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
468                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
469         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
470                  "obd %p != obd_devs[%d] %p\n",
471                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
472         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
473                  "obd_refcount should be 0, not %d\n",
474                  atomic_read(&obd->obd_refcount));
475         LASSERT(obd_type != NULL);
476
477         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
478                obd->obd_name, obd->obd_type->typ_name);
479
480         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
481                          obd->obd_name, obd->obd_uuid.uuid);
482         if (obd->obd_stopping) {
483                 int err;
484
485                 /* If we're not stopping, we were never set up */
486                 err = obd_cleanup(obd);
487                 if (err)
488                         CERROR("Cleanup %s returned %d\n",
489                                 obd->obd_name, err);
490         }
491
492         obd_device_free(obd);
493
494         class_put_type(obd_type);
495 }
496
497 /**
498  * Unregister obd device.
499  *
500  * Free slot in obd_dev[] used by \a obd.
501  *
502  * \param[in] new_obd obd_device to be unregistered
503  *
504  * \retval none
505  */
506 void class_unregister_device(struct obd_device *obd)
507 {
508         write_lock(&obd_dev_lock);
509         if (obd->obd_minor >= 0) {
510                 LASSERT(obd_devs[obd->obd_minor] == obd);
511                 obd_devs[obd->obd_minor] = NULL;
512                 obd->obd_minor = -1;
513         }
514         write_unlock(&obd_dev_lock);
515 }
516
517 /**
518  * Register obd device.
519  *
520  * Find free slot in obd_devs[], fills it with \a new_obd.
521  *
522  * \param[in] new_obd obd_device to be registered
523  *
524  * \retval 0          success
525  * \retval -EEXIST    device with this name is registered
526  * \retval -EOVERFLOW obd_devs[] is full
527  */
528 int class_register_device(struct obd_device *new_obd)
529 {
530         int ret = 0;
531         int i;
532         int new_obd_minor = 0;
533         bool minor_assign = false;
534         bool retried = false;
535
536 again:
537         write_lock(&obd_dev_lock);
538         for (i = 0; i < class_devno_max(); i++) {
539                 struct obd_device *obd = class_num2obd(i);
540
541                 if (obd != NULL &&
542                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
543
544                         if (!retried) {
545                                 write_unlock(&obd_dev_lock);
546
547                                 /* the obd_device could be waited to be
548                                  * destroyed by the "obd_zombie_impexp_thread".
549                                  */
550                                 obd_zombie_barrier();
551                                 retried = true;
552                                 goto again;
553                         }
554
555                         CERROR("%s: already exists, won't add\n",
556                                obd->obd_name);
557                         /* in case we found a free slot before duplicate */
558                         minor_assign = false;
559                         ret = -EEXIST;
560                         break;
561                 }
562                 if (!minor_assign && obd == NULL) {
563                         new_obd_minor = i;
564                         minor_assign = true;
565                 }
566         }
567
568         if (minor_assign) {
569                 new_obd->obd_minor = new_obd_minor;
570                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
571                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
572                 obd_devs[new_obd_minor] = new_obd;
573         } else {
574                 if (ret == 0) {
575                         ret = -EOVERFLOW;
576                         CERROR("%s: all %u/%u devices used, increase "
577                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
578                                i, class_devno_max(), ret);
579                 }
580         }
581         write_unlock(&obd_dev_lock);
582
583         RETURN(ret);
584 }
585
586 static int class_name2dev_nolock(const char *name)
587 {
588         int i;
589
590         if (!name)
591                 return -1;
592
593         for (i = 0; i < class_devno_max(); i++) {
594                 struct obd_device *obd = class_num2obd(i);
595
596                 if (obd && strcmp(name, obd->obd_name) == 0) {
597                         /* Make sure we finished attaching before we give
598                            out any references */
599                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
600                         if (obd->obd_attached) {
601                                 return i;
602                         }
603                         break;
604                 }
605         }
606
607         return -1;
608 }
609
610 int class_name2dev(const char *name)
611 {
612         int i;
613
614         if (!name)
615                 return -1;
616
617         read_lock(&obd_dev_lock);
618         i = class_name2dev_nolock(name);
619         read_unlock(&obd_dev_lock);
620
621         return i;
622 }
623 EXPORT_SYMBOL(class_name2dev);
624
625 struct obd_device *class_name2obd(const char *name)
626 {
627         int dev = class_name2dev(name);
628
629         if (dev < 0 || dev > class_devno_max())
630                 return NULL;
631         return class_num2obd(dev);
632 }
633 EXPORT_SYMBOL(class_name2obd);
634
635 int class_uuid2dev_nolock(struct obd_uuid *uuid)
636 {
637         int i;
638
639         for (i = 0; i < class_devno_max(); i++) {
640                 struct obd_device *obd = class_num2obd(i);
641
642                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
643                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
644                         return i;
645                 }
646         }
647
648         return -1;
649 }
650
651 int class_uuid2dev(struct obd_uuid *uuid)
652 {
653         int i;
654
655         read_lock(&obd_dev_lock);
656         i = class_uuid2dev_nolock(uuid);
657         read_unlock(&obd_dev_lock);
658
659         return i;
660 }
661 EXPORT_SYMBOL(class_uuid2dev);
662
663 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
664 {
665         int dev = class_uuid2dev(uuid);
666         if (dev < 0)
667                 return NULL;
668         return class_num2obd(dev);
669 }
670 EXPORT_SYMBOL(class_uuid2obd);
671
672 /**
673  * Get obd device from ::obd_devs[]
674  *
675  * \param num [in] array index
676  *
677  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
678  *         otherwise return the obd device there.
679  */
680 struct obd_device *class_num2obd(int num)
681 {
682         struct obd_device *obd = NULL;
683
684         if (num < class_devno_max()) {
685                 obd = obd_devs[num];
686                 if (obd == NULL)
687                         return NULL;
688
689                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
690                          "%p obd_magic %08x != %08x\n",
691                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
692                 LASSERTF(obd->obd_minor == num,
693                          "%p obd_minor %0d != %0d\n",
694                          obd, obd->obd_minor, num);
695         }
696
697         return obd;
698 }
699
700 /**
701  * Find obd in obd_dev[] by name or uuid.
702  *
703  * Increment obd's refcount if found.
704  *
705  * \param[in] str obd name or uuid
706  *
707  * \retval NULL    if not found
708  * \retval target  pointer to found obd_device
709  */
710 struct obd_device *class_dev_by_str(const char *str)
711 {
712         struct obd_device *target = NULL;
713         struct obd_uuid tgtuuid;
714         int rc;
715
716         obd_str2uuid(&tgtuuid, str);
717
718         read_lock(&obd_dev_lock);
719         rc = class_uuid2dev_nolock(&tgtuuid);
720         if (rc < 0)
721                 rc = class_name2dev_nolock(str);
722
723         if (rc >= 0)
724                 target = class_num2obd(rc);
725
726         if (target != NULL)
727                 class_incref(target, "find", current);
728         read_unlock(&obd_dev_lock);
729
730         RETURN(target);
731 }
732 EXPORT_SYMBOL(class_dev_by_str);
733
734 /**
735  * Get obd devices count. Device in any
736  *    state are counted
737  * \retval obd device count
738  */
739 int get_devices_count(void)
740 {
741         int index, max_index = class_devno_max(), dev_count = 0;
742
743         read_lock(&obd_dev_lock);
744         for (index = 0; index <= max_index; index++) {
745                 struct obd_device *obd = class_num2obd(index);
746                 if (obd != NULL)
747                         dev_count++;
748         }
749         read_unlock(&obd_dev_lock);
750
751         return dev_count;
752 }
753 EXPORT_SYMBOL(get_devices_count);
754
755 void class_obd_list(void)
756 {
757         char *status;
758         int i;
759
760         read_lock(&obd_dev_lock);
761         for (i = 0; i < class_devno_max(); i++) {
762                 struct obd_device *obd = class_num2obd(i);
763
764                 if (obd == NULL)
765                         continue;
766                 if (obd->obd_stopping)
767                         status = "ST";
768                 else if (obd->obd_set_up)
769                         status = "UP";
770                 else if (obd->obd_attached)
771                         status = "AT";
772                 else
773                         status = "--";
774                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
775                          i, status, obd->obd_type->typ_name,
776                          obd->obd_name, obd->obd_uuid.uuid,
777                          atomic_read(&obd->obd_refcount));
778         }
779         read_unlock(&obd_dev_lock);
780         return;
781 }
782
783 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
784    specified, then only the client with that uuid is returned,
785    otherwise any client connected to the tgt is returned. */
786 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
787                                           const char * typ_name,
788                                           struct obd_uuid *grp_uuid)
789 {
790         int i;
791
792         read_lock(&obd_dev_lock);
793         for (i = 0; i < class_devno_max(); i++) {
794                 struct obd_device *obd = class_num2obd(i);
795
796                 if (obd == NULL)
797                         continue;
798                 if ((strncmp(obd->obd_type->typ_name, typ_name,
799                              strlen(typ_name)) == 0)) {
800                         if (obd_uuid_equals(tgt_uuid,
801                                             &obd->u.cli.cl_target_uuid) &&
802                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
803                                                          &obd->obd_uuid) : 1)) {
804                                 read_unlock(&obd_dev_lock);
805                                 return obd;
806                         }
807                 }
808         }
809         read_unlock(&obd_dev_lock);
810
811         return NULL;
812 }
813 EXPORT_SYMBOL(class_find_client_obd);
814
815 /* Iterate the obd_device list looking devices have grp_uuid. Start
816    searching at *next, and if a device is found, the next index to look
817    at is saved in *next. If next is NULL, then the first matching device
818    will always be returned. */
819 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
820 {
821         int i;
822
823         if (next == NULL)
824                 i = 0;
825         else if (*next >= 0 && *next < class_devno_max())
826                 i = *next;
827         else
828                 return NULL;
829
830         read_lock(&obd_dev_lock);
831         for (; i < class_devno_max(); i++) {
832                 struct obd_device *obd = class_num2obd(i);
833
834                 if (obd == NULL)
835                         continue;
836                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
837                         if (next != NULL)
838                                 *next = i+1;
839                         read_unlock(&obd_dev_lock);
840                         return obd;
841                 }
842         }
843         read_unlock(&obd_dev_lock);
844
845         return NULL;
846 }
847 EXPORT_SYMBOL(class_devices_in_group);
848
849 /**
850  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
851  * adjust sptlrpc settings accordingly.
852  */
853 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
854 {
855         struct obd_device  *obd;
856         const char         *type;
857         int                 i, rc = 0, rc2;
858
859         LASSERT(namelen > 0);
860
861         read_lock(&obd_dev_lock);
862         for (i = 0; i < class_devno_max(); i++) {
863                 obd = class_num2obd(i);
864
865                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
866                         continue;
867
868                 /* only notify mdc, osc, osp, lwp, mdt, ost
869                  * because only these have a -sptlrpc llog */
870                 type = obd->obd_type->typ_name;
871                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
872                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
873                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
874                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
875                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
876                     strcmp(type, LUSTRE_OST_NAME) != 0)
877                         continue;
878
879                 if (strncmp(obd->obd_name, fsname, namelen))
880                         continue;
881
882                 class_incref(obd, __FUNCTION__, obd);
883                 read_unlock(&obd_dev_lock);
884                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
885                                          sizeof(KEY_SPTLRPC_CONF),
886                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
887                 rc = rc ? rc : rc2;
888                 class_decref(obd, __FUNCTION__, obd);
889                 read_lock(&obd_dev_lock);
890         }
891         read_unlock(&obd_dev_lock);
892         return rc;
893 }
894 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
895
896 void obd_cleanup_caches(void)
897 {
898         ENTRY;
899         if (obd_device_cachep) {
900                 kmem_cache_destroy(obd_device_cachep);
901                 obd_device_cachep = NULL;
902         }
903
904         EXIT;
905 }
906
907 int obd_init_caches(void)
908 {
909         int rc;
910         ENTRY;
911
912         LASSERT(obd_device_cachep == NULL);
913         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
914                                               sizeof(struct obd_device),
915                                               0, 0, NULL);
916         if (!obd_device_cachep)
917                 GOTO(out, rc = -ENOMEM);
918
919         RETURN(0);
920 out:
921         obd_cleanup_caches();
922         RETURN(rc);
923 }
924
925 /* map connection to client */
926 struct obd_export *class_conn2export(struct lustre_handle *conn)
927 {
928         struct obd_export *export;
929         ENTRY;
930
931         if (!conn) {
932                 CDEBUG(D_CACHE, "looking for null handle\n");
933                 RETURN(NULL);
934         }
935
936         if (conn->cookie == -1) {  /* this means assign a new connection */
937                 CDEBUG(D_CACHE, "want a new connection\n");
938                 RETURN(NULL);
939         }
940
941         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
942         export = class_handle2object(conn->cookie, NULL);
943         RETURN(export);
944 }
945 EXPORT_SYMBOL(class_conn2export);
946
947 struct obd_device *class_exp2obd(struct obd_export *exp)
948 {
949         if (exp)
950                 return exp->exp_obd;
951         return NULL;
952 }
953 EXPORT_SYMBOL(class_exp2obd);
954
955 struct obd_import *class_exp2cliimp(struct obd_export *exp)
956 {
957         struct obd_device *obd = exp->exp_obd;
958         if (obd == NULL)
959                 return NULL;
960         return obd->u.cli.cl_import;
961 }
962 EXPORT_SYMBOL(class_exp2cliimp);
963
964 /* Export management functions */
965 static void class_export_destroy(struct obd_export *exp)
966 {
967         struct obd_device *obd = exp->exp_obd;
968         ENTRY;
969
970         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
971         LASSERT(obd != NULL);
972
973         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
974                exp->exp_client_uuid.uuid, obd->obd_name);
975
976         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
977         if (exp->exp_connection)
978                 ptlrpc_put_connection_superhack(exp->exp_connection);
979
980         LASSERT(list_empty(&exp->exp_outstanding_replies));
981         LASSERT(list_empty(&exp->exp_uncommitted_replies));
982         LASSERT(list_empty(&exp->exp_req_replay_queue));
983         LASSERT(list_empty(&exp->exp_hp_rpcs));
984         obd_destroy_export(exp);
985         /* self export doesn't hold a reference to an obd, although it
986          * exists until freeing of the obd */
987         if (exp != obd->obd_self_export)
988                 class_decref(obd, "export", exp);
989
990         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
991         EXIT;
992 }
993
994 static void export_handle_addref(void *export)
995 {
996         class_export_get(export);
997 }
998
999 static struct portals_handle_ops export_handle_ops = {
1000         .hop_addref = export_handle_addref,
1001         .hop_free   = NULL,
1002 };
1003
1004 struct obd_export *class_export_get(struct obd_export *exp)
1005 {
1006         atomic_inc(&exp->exp_refcount);
1007         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1008                atomic_read(&exp->exp_refcount));
1009         return exp;
1010 }
1011 EXPORT_SYMBOL(class_export_get);
1012
1013 void class_export_put(struct obd_export *exp)
1014 {
1015         LASSERT(exp != NULL);
1016         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1017         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1018                atomic_read(&exp->exp_refcount) - 1);
1019
1020         if (atomic_dec_and_test(&exp->exp_refcount)) {
1021                 struct obd_device *obd = exp->exp_obd;
1022
1023                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1024                        exp, exp->exp_client_uuid.uuid);
1025
1026                 /* release nid stat refererence */
1027                 lprocfs_exp_cleanup(exp);
1028
1029                 if (exp == obd->obd_self_export) {
1030                         /* self export should be destroyed without
1031                          * zombie thread as it doesn't hold a
1032                          * reference to obd and doesn't hold any
1033                          * resources */
1034                         class_export_destroy(exp);
1035                         /* self export is destroyed, no class
1036                          * references exist and it is safe to free
1037                          * obd */
1038                         class_free_dev(obd);
1039                 } else {
1040                         LASSERT(!list_empty(&exp->exp_obd_chain));
1041                         obd_zombie_export_add(exp);
1042                 }
1043
1044         }
1045 }
1046 EXPORT_SYMBOL(class_export_put);
1047
1048 static void obd_zombie_exp_cull(struct work_struct *ws)
1049 {
1050         struct obd_export *export;
1051
1052         export = container_of(ws, struct obd_export, exp_zombie_work);
1053         class_export_destroy(export);
1054 }
1055
1056 /* Creates a new export, adds it to the hash table, and returns a
1057  * pointer to it. The refcount is 2: one for the hash reference, and
1058  * one for the pointer returned by this function. */
1059 struct obd_export *__class_new_export(struct obd_device *obd,
1060                                       struct obd_uuid *cluuid, bool is_self)
1061 {
1062         struct obd_export *export;
1063         struct cfs_hash *hash = NULL;
1064         int rc = 0;
1065         ENTRY;
1066
1067         OBD_ALLOC_PTR(export);
1068         if (!export)
1069                 return ERR_PTR(-ENOMEM);
1070
1071         export->exp_conn_cnt = 0;
1072         export->exp_lock_hash = NULL;
1073         export->exp_flock_hash = NULL;
1074         /* 2 = class_handle_hash + last */
1075         atomic_set(&export->exp_refcount, 2);
1076         atomic_set(&export->exp_rpc_count, 0);
1077         atomic_set(&export->exp_cb_count, 0);
1078         atomic_set(&export->exp_locks_count, 0);
1079 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1080         INIT_LIST_HEAD(&export->exp_locks_list);
1081         spin_lock_init(&export->exp_locks_list_guard);
1082 #endif
1083         atomic_set(&export->exp_replay_count, 0);
1084         export->exp_obd = obd;
1085         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1086         spin_lock_init(&export->exp_uncommitted_replies_lock);
1087         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1088         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1089         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1090         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1091         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1092         class_handle_hash(&export->exp_handle, &export_handle_ops);
1093         export->exp_last_request_time = ktime_get_real_seconds();
1094         spin_lock_init(&export->exp_lock);
1095         spin_lock_init(&export->exp_rpc_lock);
1096         INIT_HLIST_NODE(&export->exp_uuid_hash);
1097         INIT_HLIST_NODE(&export->exp_nid_hash);
1098         INIT_HLIST_NODE(&export->exp_gen_hash);
1099         spin_lock_init(&export->exp_bl_list_lock);
1100         INIT_LIST_HEAD(&export->exp_bl_list);
1101         INIT_LIST_HEAD(&export->exp_stale_list);
1102         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1103
1104         export->exp_sp_peer = LUSTRE_SP_ANY;
1105         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1106         export->exp_client_uuid = *cluuid;
1107         obd_init_export(export);
1108
1109         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1110                 spin_lock(&obd->obd_dev_lock);
1111                 /* shouldn't happen, but might race */
1112                 if (obd->obd_stopping)
1113                         GOTO(exit_unlock, rc = -ENODEV);
1114
1115                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1116                 if (hash == NULL)
1117                         GOTO(exit_unlock, rc = -ENODEV);
1118                 spin_unlock(&obd->obd_dev_lock);
1119
1120                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1121                 if (rc != 0) {
1122                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1123                                       obd->obd_name, cluuid->uuid, rc);
1124                         GOTO(exit_err, rc = -EALREADY);
1125                 }
1126         }
1127
1128         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1129         spin_lock(&obd->obd_dev_lock);
1130         if (obd->obd_stopping) {
1131                 if (hash)
1132                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1133                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1134         }
1135
1136         if (!is_self) {
1137                 class_incref(obd, "export", export);
1138                 list_add_tail(&export->exp_obd_chain_timed,
1139                               &obd->obd_exports_timed);
1140                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1141                 obd->obd_num_exports++;
1142         } else {
1143                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1144                 INIT_LIST_HEAD(&export->exp_obd_chain);
1145         }
1146         spin_unlock(&obd->obd_dev_lock);
1147         if (hash)
1148                 cfs_hash_putref(hash);
1149         RETURN(export);
1150
1151 exit_unlock:
1152         spin_unlock(&obd->obd_dev_lock);
1153 exit_err:
1154         if (hash)
1155                 cfs_hash_putref(hash);
1156         class_handle_unhash(&export->exp_handle);
1157         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1158         obd_destroy_export(export);
1159         OBD_FREE_PTR(export);
1160         return ERR_PTR(rc);
1161 }
1162
1163 struct obd_export *class_new_export(struct obd_device *obd,
1164                                     struct obd_uuid *uuid)
1165 {
1166         return __class_new_export(obd, uuid, false);
1167 }
1168 EXPORT_SYMBOL(class_new_export);
1169
1170 struct obd_export *class_new_export_self(struct obd_device *obd,
1171                                          struct obd_uuid *uuid)
1172 {
1173         return __class_new_export(obd, uuid, true);
1174 }
1175
1176 void class_unlink_export(struct obd_export *exp)
1177 {
1178         class_handle_unhash(&exp->exp_handle);
1179
1180         if (exp->exp_obd->obd_self_export == exp) {
1181                 class_export_put(exp);
1182                 return;
1183         }
1184
1185         spin_lock(&exp->exp_obd->obd_dev_lock);
1186         /* delete an uuid-export hashitem from hashtables */
1187         if (!hlist_unhashed(&exp->exp_uuid_hash))
1188                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1189                              &exp->exp_client_uuid,
1190                              &exp->exp_uuid_hash);
1191
1192 #ifdef HAVE_SERVER_SUPPORT
1193         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1194                 struct tg_export_data   *ted = &exp->exp_target_data;
1195                 struct cfs_hash         *hash;
1196
1197                 /* Because obd_gen_hash will not be released until
1198                  * class_cleanup(), so hash should never be NULL here */
1199                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1200                 LASSERT(hash != NULL);
1201                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1202                              &exp->exp_gen_hash);
1203                 cfs_hash_putref(hash);
1204         }
1205 #endif /* HAVE_SERVER_SUPPORT */
1206
1207         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1208         list_del_init(&exp->exp_obd_chain_timed);
1209         exp->exp_obd->obd_num_exports--;
1210         spin_unlock(&exp->exp_obd->obd_dev_lock);
1211         atomic_inc(&obd_stale_export_num);
1212
1213         /* A reference is kept by obd_stale_exports list */
1214         obd_stale_export_put(exp);
1215 }
1216 EXPORT_SYMBOL(class_unlink_export);
1217
1218 /* Import management functions */
1219 static void obd_zombie_import_free(struct obd_import *imp)
1220 {
1221         ENTRY;
1222
1223         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1224                 imp->imp_obd->obd_name);
1225
1226         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1227
1228         ptlrpc_put_connection_superhack(imp->imp_connection);
1229
1230         while (!list_empty(&imp->imp_conn_list)) {
1231                 struct obd_import_conn *imp_conn;
1232
1233                 imp_conn = list_entry(imp->imp_conn_list.next,
1234                                       struct obd_import_conn, oic_item);
1235                 list_del_init(&imp_conn->oic_item);
1236                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1237                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1238         }
1239
1240         LASSERT(imp->imp_sec == NULL);
1241         class_decref(imp->imp_obd, "import", imp);
1242         OBD_FREE_PTR(imp);
1243         EXIT;
1244 }
1245
1246 struct obd_import *class_import_get(struct obd_import *import)
1247 {
1248         atomic_inc(&import->imp_refcount);
1249         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1250                atomic_read(&import->imp_refcount),
1251                import->imp_obd->obd_name);
1252         return import;
1253 }
1254 EXPORT_SYMBOL(class_import_get);
1255
1256 void class_import_put(struct obd_import *imp)
1257 {
1258         ENTRY;
1259
1260         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1261
1262         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1263                atomic_read(&imp->imp_refcount) - 1,
1264                imp->imp_obd->obd_name);
1265
1266         if (atomic_dec_and_test(&imp->imp_refcount)) {
1267                 CDEBUG(D_INFO, "final put import %p\n", imp);
1268                 obd_zombie_import_add(imp);
1269         }
1270
1271         /* catch possible import put race */
1272         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1273         EXIT;
1274 }
1275 EXPORT_SYMBOL(class_import_put);
1276
1277 static void init_imp_at(struct imp_at *at) {
1278         int i;
1279         at_init(&at->iat_net_latency, 0, 0);
1280         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1281                 /* max service estimates are tracked on the server side, so
1282                    don't use the AT history here, just use the last reported
1283                    val. (But keep hist for proc histogram, worst_ever) */
1284                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1285                         AT_FLG_NOHIST);
1286         }
1287 }
1288
1289 static void obd_zombie_imp_cull(struct work_struct *ws)
1290 {
1291         struct obd_import *import;
1292
1293         import = container_of(ws, struct obd_import, imp_zombie_work);
1294         obd_zombie_import_free(import);
1295 }
1296
1297 struct obd_import *class_new_import(struct obd_device *obd)
1298 {
1299         struct obd_import *imp;
1300         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1301
1302         OBD_ALLOC(imp, sizeof(*imp));
1303         if (imp == NULL)
1304                 return NULL;
1305
1306         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1307         INIT_LIST_HEAD(&imp->imp_replay_list);
1308         INIT_LIST_HEAD(&imp->imp_sending_list);
1309         INIT_LIST_HEAD(&imp->imp_delayed_list);
1310         INIT_LIST_HEAD(&imp->imp_committed_list);
1311         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1312         imp->imp_known_replied_xid = 0;
1313         imp->imp_replay_cursor = &imp->imp_committed_list;
1314         spin_lock_init(&imp->imp_lock);
1315         imp->imp_last_success_conn = 0;
1316         imp->imp_state = LUSTRE_IMP_NEW;
1317         imp->imp_obd = class_incref(obd, "import", imp);
1318         mutex_init(&imp->imp_sec_mutex);
1319         init_waitqueue_head(&imp->imp_recovery_waitq);
1320         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1321
1322         if (curr_pid_ns->child_reaper)
1323                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1324         else
1325                 imp->imp_sec_refpid = 1;
1326
1327         atomic_set(&imp->imp_refcount, 2);
1328         atomic_set(&imp->imp_unregistering, 0);
1329         atomic_set(&imp->imp_inflight, 0);
1330         atomic_set(&imp->imp_replay_inflight, 0);
1331         atomic_set(&imp->imp_inval_count, 0);
1332         INIT_LIST_HEAD(&imp->imp_conn_list);
1333         init_imp_at(&imp->imp_at);
1334
1335         /* the default magic is V2, will be used in connect RPC, and
1336          * then adjusted according to the flags in request/reply. */
1337         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1338
1339         return imp;
1340 }
1341 EXPORT_SYMBOL(class_new_import);
1342
1343 void class_destroy_import(struct obd_import *import)
1344 {
1345         LASSERT(import != NULL);
1346         LASSERT(import != LP_POISON);
1347
1348         spin_lock(&import->imp_lock);
1349         import->imp_generation++;
1350         spin_unlock(&import->imp_lock);
1351         class_import_put(import);
1352 }
1353 EXPORT_SYMBOL(class_destroy_import);
1354
1355 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1356
1357 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1358 {
1359         spin_lock(&exp->exp_locks_list_guard);
1360
1361         LASSERT(lock->l_exp_refs_nr >= 0);
1362
1363         if (lock->l_exp_refs_target != NULL &&
1364             lock->l_exp_refs_target != exp) {
1365                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1366                               exp, lock, lock->l_exp_refs_target);
1367         }
1368         if ((lock->l_exp_refs_nr ++) == 0) {
1369                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1370                 lock->l_exp_refs_target = exp;
1371         }
1372         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1373                lock, exp, lock->l_exp_refs_nr);
1374         spin_unlock(&exp->exp_locks_list_guard);
1375 }
1376 EXPORT_SYMBOL(__class_export_add_lock_ref);
1377
1378 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1379 {
1380         spin_lock(&exp->exp_locks_list_guard);
1381         LASSERT(lock->l_exp_refs_nr > 0);
1382         if (lock->l_exp_refs_target != exp) {
1383                 LCONSOLE_WARN("lock %p, "
1384                               "mismatching export pointers: %p, %p\n",
1385                               lock, lock->l_exp_refs_target, exp);
1386         }
1387         if (-- lock->l_exp_refs_nr == 0) {
1388                 list_del_init(&lock->l_exp_refs_link);
1389                 lock->l_exp_refs_target = NULL;
1390         }
1391         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1392                lock, exp, lock->l_exp_refs_nr);
1393         spin_unlock(&exp->exp_locks_list_guard);
1394 }
1395 EXPORT_SYMBOL(__class_export_del_lock_ref);
1396 #endif
1397
1398 /* A connection defines an export context in which preallocation can
1399    be managed. This releases the export pointer reference, and returns
1400    the export handle, so the export refcount is 1 when this function
1401    returns. */
1402 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1403                   struct obd_uuid *cluuid)
1404 {
1405         struct obd_export *export;
1406         LASSERT(conn != NULL);
1407         LASSERT(obd != NULL);
1408         LASSERT(cluuid != NULL);
1409         ENTRY;
1410
1411         export = class_new_export(obd, cluuid);
1412         if (IS_ERR(export))
1413                 RETURN(PTR_ERR(export));
1414
1415         conn->cookie = export->exp_handle.h_cookie;
1416         class_export_put(export);
1417
1418         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1419                cluuid->uuid, conn->cookie);
1420         RETURN(0);
1421 }
1422 EXPORT_SYMBOL(class_connect);
1423
1424 /* if export is involved in recovery then clean up related things */
1425 static void class_export_recovery_cleanup(struct obd_export *exp)
1426 {
1427         struct obd_device *obd = exp->exp_obd;
1428
1429         spin_lock(&obd->obd_recovery_task_lock);
1430         if (obd->obd_recovering) {
1431                 if (exp->exp_in_recovery) {
1432                         spin_lock(&exp->exp_lock);
1433                         exp->exp_in_recovery = 0;
1434                         spin_unlock(&exp->exp_lock);
1435                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1436                         atomic_dec(&obd->obd_connected_clients);
1437                 }
1438
1439                 /* if called during recovery then should update
1440                  * obd_stale_clients counter,
1441                  * lightweight exports are not counted */
1442                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1443                         exp->exp_obd->obd_stale_clients++;
1444         }
1445         spin_unlock(&obd->obd_recovery_task_lock);
1446
1447         spin_lock(&exp->exp_lock);
1448         /** Cleanup req replay fields */
1449         if (exp->exp_req_replay_needed) {
1450                 exp->exp_req_replay_needed = 0;
1451
1452                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1453                 atomic_dec(&obd->obd_req_replay_clients);
1454         }
1455
1456         /** Cleanup lock replay data */
1457         if (exp->exp_lock_replay_needed) {
1458                 exp->exp_lock_replay_needed = 0;
1459
1460                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1461                 atomic_dec(&obd->obd_lock_replay_clients);
1462         }
1463         spin_unlock(&exp->exp_lock);
1464 }
1465
1466 /* This function removes 1-3 references from the export:
1467  * 1 - for export pointer passed
1468  * and if disconnect really need
1469  * 2 - removing from hash
1470  * 3 - in client_unlink_export
1471  * The export pointer passed to this function can destroyed */
1472 int class_disconnect(struct obd_export *export)
1473 {
1474         int already_disconnected;
1475         ENTRY;
1476
1477         if (export == NULL) {
1478                 CWARN("attempting to free NULL export %p\n", export);
1479                 RETURN(-EINVAL);
1480         }
1481
1482         spin_lock(&export->exp_lock);
1483         already_disconnected = export->exp_disconnected;
1484         export->exp_disconnected = 1;
1485         /*  We hold references of export for uuid hash
1486          *  and nid_hash and export link at least. So
1487          *  it is safe to call cfs_hash_del in there.  */
1488         if (!hlist_unhashed(&export->exp_nid_hash))
1489                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1490                              &export->exp_connection->c_peer.nid,
1491                              &export->exp_nid_hash);
1492         spin_unlock(&export->exp_lock);
1493
1494         /* class_cleanup(), abort_recovery(), and class_fail_export()
1495          * all end up in here, and if any of them race we shouldn't
1496          * call extra class_export_puts(). */
1497         if (already_disconnected) {
1498                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1499                 GOTO(no_disconn, already_disconnected);
1500         }
1501
1502         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1503                export->exp_handle.h_cookie);
1504
1505         class_export_recovery_cleanup(export);
1506         class_unlink_export(export);
1507 no_disconn:
1508         class_export_put(export);
1509         RETURN(0);
1510 }
1511 EXPORT_SYMBOL(class_disconnect);
1512
1513 /* Return non-zero for a fully connected export */
1514 int class_connected_export(struct obd_export *exp)
1515 {
1516         int connected = 0;
1517
1518         if (exp) {
1519                 spin_lock(&exp->exp_lock);
1520                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1521                 spin_unlock(&exp->exp_lock);
1522         }
1523         return connected;
1524 }
1525 EXPORT_SYMBOL(class_connected_export);
1526
1527 static void class_disconnect_export_list(struct list_head *list,
1528                                          enum obd_option flags)
1529 {
1530         int rc;
1531         struct obd_export *exp;
1532         ENTRY;
1533
1534         /* It's possible that an export may disconnect itself, but
1535          * nothing else will be added to this list. */
1536         while (!list_empty(list)) {
1537                 exp = list_entry(list->next, struct obd_export,
1538                                  exp_obd_chain);
1539                 /* need for safe call CDEBUG after obd_disconnect */
1540                 class_export_get(exp);
1541
1542                 spin_lock(&exp->exp_lock);
1543                 exp->exp_flags = flags;
1544                 spin_unlock(&exp->exp_lock);
1545
1546                 if (obd_uuid_equals(&exp->exp_client_uuid,
1547                                     &exp->exp_obd->obd_uuid)) {
1548                         CDEBUG(D_HA,
1549                                "exp %p export uuid == obd uuid, don't discon\n",
1550                                exp);
1551                         /* Need to delete this now so we don't end up pointing
1552                          * to work_list later when this export is cleaned up. */
1553                         list_del_init(&exp->exp_obd_chain);
1554                         class_export_put(exp);
1555                         continue;
1556                 }
1557
1558                 class_export_get(exp);
1559                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1560                        "last request at %lld\n",
1561                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1562                        exp, exp->exp_last_request_time);
1563                 /* release one export reference anyway */
1564                 rc = obd_disconnect(exp);
1565
1566                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1567                        obd_export_nid2str(exp), exp, rc);
1568                 class_export_put(exp);
1569         }
1570         EXIT;
1571 }
1572
1573 void class_disconnect_exports(struct obd_device *obd)
1574 {
1575         struct list_head work_list;
1576         ENTRY;
1577
1578         /* Move all of the exports from obd_exports to a work list, en masse. */
1579         INIT_LIST_HEAD(&work_list);
1580         spin_lock(&obd->obd_dev_lock);
1581         list_splice_init(&obd->obd_exports, &work_list);
1582         list_splice_init(&obd->obd_delayed_exports, &work_list);
1583         spin_unlock(&obd->obd_dev_lock);
1584
1585         if (!list_empty(&work_list)) {
1586                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1587                        "disconnecting them\n", obd->obd_minor, obd);
1588                 class_disconnect_export_list(&work_list,
1589                                              exp_flags_from_obd(obd));
1590         } else
1591                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1592                        obd->obd_minor, obd);
1593         EXIT;
1594 }
1595 EXPORT_SYMBOL(class_disconnect_exports);
1596
1597 /* Remove exports that have not completed recovery.
1598  */
1599 void class_disconnect_stale_exports(struct obd_device *obd,
1600                                     int (*test_export)(struct obd_export *))
1601 {
1602         struct list_head work_list;
1603         struct obd_export *exp, *n;
1604         int evicted = 0;
1605         ENTRY;
1606
1607         INIT_LIST_HEAD(&work_list);
1608         spin_lock(&obd->obd_dev_lock);
1609         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1610                                  exp_obd_chain) {
1611                 /* don't count self-export as client */
1612                 if (obd_uuid_equals(&exp->exp_client_uuid,
1613                                     &exp->exp_obd->obd_uuid))
1614                         continue;
1615
1616                 /* don't evict clients which have no slot in last_rcvd
1617                  * (e.g. lightweight connection) */
1618                 if (exp->exp_target_data.ted_lr_idx == -1)
1619                         continue;
1620
1621                 spin_lock(&exp->exp_lock);
1622                 if (exp->exp_failed || test_export(exp)) {
1623                         spin_unlock(&exp->exp_lock);
1624                         continue;
1625                 }
1626                 exp->exp_failed = 1;
1627                 spin_unlock(&exp->exp_lock);
1628
1629                 list_move(&exp->exp_obd_chain, &work_list);
1630                 evicted++;
1631                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1632                        obd->obd_name, exp->exp_client_uuid.uuid,
1633                        obd_export_nid2str(exp));
1634                 print_export_data(exp, "EVICTING", 0, D_HA);
1635         }
1636         spin_unlock(&obd->obd_dev_lock);
1637
1638         if (evicted)
1639                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1640                               obd->obd_name, evicted);
1641
1642         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1643                                                  OBD_OPT_ABORT_RECOV);
1644         EXIT;
1645 }
1646 EXPORT_SYMBOL(class_disconnect_stale_exports);
1647
1648 void class_fail_export(struct obd_export *exp)
1649 {
1650         int rc, already_failed;
1651
1652         spin_lock(&exp->exp_lock);
1653         already_failed = exp->exp_failed;
1654         exp->exp_failed = 1;
1655         spin_unlock(&exp->exp_lock);
1656
1657         if (already_failed) {
1658                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1659                        exp, exp->exp_client_uuid.uuid);
1660                 return;
1661         }
1662
1663         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1664                exp, exp->exp_client_uuid.uuid);
1665
1666         if (obd_dump_on_timeout)
1667                 libcfs_debug_dumplog();
1668
1669         /* need for safe call CDEBUG after obd_disconnect */
1670         class_export_get(exp);
1671
1672         /* Most callers into obd_disconnect are removing their own reference
1673          * (request, for example) in addition to the one from the hash table.
1674          * We don't have such a reference here, so make one. */
1675         class_export_get(exp);
1676         rc = obd_disconnect(exp);
1677         if (rc)
1678                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1679         else
1680                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1681                        exp, exp->exp_client_uuid.uuid);
1682         class_export_put(exp);
1683 }
1684 EXPORT_SYMBOL(class_fail_export);
1685
1686 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1687 {
1688         struct cfs_hash *nid_hash;
1689         struct obd_export *doomed_exp = NULL;
1690         int exports_evicted = 0;
1691
1692         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1693
1694         spin_lock(&obd->obd_dev_lock);
1695         /* umount has run already, so evict thread should leave
1696          * its task to umount thread now */
1697         if (obd->obd_stopping) {
1698                 spin_unlock(&obd->obd_dev_lock);
1699                 return exports_evicted;
1700         }
1701         nid_hash = obd->obd_nid_hash;
1702         cfs_hash_getref(nid_hash);
1703         spin_unlock(&obd->obd_dev_lock);
1704
1705         do {
1706                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1707                 if (doomed_exp == NULL)
1708                         break;
1709
1710                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1711                          "nid %s found, wanted nid %s, requested nid %s\n",
1712                          obd_export_nid2str(doomed_exp),
1713                          libcfs_nid2str(nid_key), nid);
1714                 LASSERTF(doomed_exp != obd->obd_self_export,
1715                          "self-export is hashed by NID?\n");
1716                 exports_evicted++;
1717                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1718                               "request\n", obd->obd_name,
1719                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1720                               obd_export_nid2str(doomed_exp));
1721                 class_fail_export(doomed_exp);
1722                 class_export_put(doomed_exp);
1723         } while (1);
1724
1725         cfs_hash_putref(nid_hash);
1726
1727         if (!exports_evicted)
1728                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1729                        obd->obd_name, nid);
1730         return exports_evicted;
1731 }
1732 EXPORT_SYMBOL(obd_export_evict_by_nid);
1733
1734 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1735 {
1736         struct cfs_hash *uuid_hash;
1737         struct obd_export *doomed_exp = NULL;
1738         struct obd_uuid doomed_uuid;
1739         int exports_evicted = 0;
1740
1741         spin_lock(&obd->obd_dev_lock);
1742         if (obd->obd_stopping) {
1743                 spin_unlock(&obd->obd_dev_lock);
1744                 return exports_evicted;
1745         }
1746         uuid_hash = obd->obd_uuid_hash;
1747         cfs_hash_getref(uuid_hash);
1748         spin_unlock(&obd->obd_dev_lock);
1749
1750         obd_str2uuid(&doomed_uuid, uuid);
1751         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1752                 CERROR("%s: can't evict myself\n", obd->obd_name);
1753                 cfs_hash_putref(uuid_hash);
1754                 return exports_evicted;
1755         }
1756
1757         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1758
1759         if (doomed_exp == NULL) {
1760                 CERROR("%s: can't disconnect %s: no exports found\n",
1761                        obd->obd_name, uuid);
1762         } else {
1763                 CWARN("%s: evicting %s at adminstrative request\n",
1764                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1765                 class_fail_export(doomed_exp);
1766                 class_export_put(doomed_exp);
1767                 exports_evicted++;
1768         }
1769         cfs_hash_putref(uuid_hash);
1770
1771         return exports_evicted;
1772 }
1773
1774 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1775 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1776 EXPORT_SYMBOL(class_export_dump_hook);
1777 #endif
1778
1779 static void print_export_data(struct obd_export *exp, const char *status,
1780                               int locks, int debug_level)
1781 {
1782         struct ptlrpc_reply_state *rs;
1783         struct ptlrpc_reply_state *first_reply = NULL;
1784         int nreplies = 0;
1785
1786         spin_lock(&exp->exp_lock);
1787         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1788                             rs_exp_list) {
1789                 if (nreplies == 0)
1790                         first_reply = rs;
1791                 nreplies++;
1792         }
1793         spin_unlock(&exp->exp_lock);
1794
1795         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1796                "%p %s %llu stale:%d\n",
1797                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1798                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1799                atomic_read(&exp->exp_rpc_count),
1800                atomic_read(&exp->exp_cb_count),
1801                atomic_read(&exp->exp_locks_count),
1802                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1803                nreplies, first_reply, nreplies > 3 ? "..." : "",
1804                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1805 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1806         if (locks && class_export_dump_hook != NULL)
1807                 class_export_dump_hook(exp);
1808 #endif
1809 }
1810
1811 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1812 {
1813         struct obd_export *exp;
1814
1815         spin_lock(&obd->obd_dev_lock);
1816         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1817                 print_export_data(exp, "ACTIVE", locks, debug_level);
1818         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1819                 print_export_data(exp, "UNLINKED", locks, debug_level);
1820         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1821                 print_export_data(exp, "DELAYED", locks, debug_level);
1822         spin_unlock(&obd->obd_dev_lock);
1823 }
1824
1825 void obd_exports_barrier(struct obd_device *obd)
1826 {
1827         int waited = 2;
1828         LASSERT(list_empty(&obd->obd_exports));
1829         spin_lock(&obd->obd_dev_lock);
1830         while (!list_empty(&obd->obd_unlinked_exports)) {
1831                 spin_unlock(&obd->obd_dev_lock);
1832                 set_current_state(TASK_UNINTERRUPTIBLE);
1833                 schedule_timeout(cfs_time_seconds(waited));
1834                 if (waited > 5 && is_power_of_2(waited)) {
1835                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1836                                       "more than %d seconds. "
1837                                       "The obd refcount = %d. Is it stuck?\n",
1838                                       obd->obd_name, waited,
1839                                       atomic_read(&obd->obd_refcount));
1840                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1841                 }
1842                 waited *= 2;
1843                 spin_lock(&obd->obd_dev_lock);
1844         }
1845         spin_unlock(&obd->obd_dev_lock);
1846 }
1847 EXPORT_SYMBOL(obd_exports_barrier);
1848
1849 /**
1850  * Add export to the obd_zombe thread and notify it.
1851  */
1852 static void obd_zombie_export_add(struct obd_export *exp) {
1853         atomic_dec(&obd_stale_export_num);
1854         spin_lock(&exp->exp_obd->obd_dev_lock);
1855         LASSERT(!list_empty(&exp->exp_obd_chain));
1856         list_del_init(&exp->exp_obd_chain);
1857         spin_unlock(&exp->exp_obd->obd_dev_lock);
1858
1859         queue_work(zombie_wq, &exp->exp_zombie_work);
1860 }
1861
1862 /**
1863  * Add import to the obd_zombe thread and notify it.
1864  */
1865 static void obd_zombie_import_add(struct obd_import *imp) {
1866         LASSERT(imp->imp_sec == NULL);
1867
1868         queue_work(zombie_wq, &imp->imp_zombie_work);
1869 }
1870
1871 /**
1872  * wait when obd_zombie import/export queues become empty
1873  */
1874 void obd_zombie_barrier(void)
1875 {
1876         flush_workqueue(zombie_wq);
1877 }
1878 EXPORT_SYMBOL(obd_zombie_barrier);
1879
1880
1881 struct obd_export *obd_stale_export_get(void)
1882 {
1883         struct obd_export *exp = NULL;
1884         ENTRY;
1885
1886         spin_lock(&obd_stale_export_lock);
1887         if (!list_empty(&obd_stale_exports)) {
1888                 exp = list_entry(obd_stale_exports.next,
1889                                  struct obd_export, exp_stale_list);
1890                 list_del_init(&exp->exp_stale_list);
1891         }
1892         spin_unlock(&obd_stale_export_lock);
1893
1894         if (exp) {
1895                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1896                        atomic_read(&obd_stale_export_num));
1897         }
1898         RETURN(exp);
1899 }
1900 EXPORT_SYMBOL(obd_stale_export_get);
1901
1902 void obd_stale_export_put(struct obd_export *exp)
1903 {
1904         ENTRY;
1905
1906         LASSERT(list_empty(&exp->exp_stale_list));
1907         if (exp->exp_lock_hash &&
1908             atomic_read(&exp->exp_lock_hash->hs_count)) {
1909                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1910                        atomic_read(&obd_stale_export_num));
1911
1912                 spin_lock_bh(&exp->exp_bl_list_lock);
1913                 spin_lock(&obd_stale_export_lock);
1914                 /* Add to the tail if there is no blocked locks,
1915                  * to the head otherwise. */
1916                 if (list_empty(&exp->exp_bl_list))
1917                         list_add_tail(&exp->exp_stale_list,
1918                                       &obd_stale_exports);
1919                 else
1920                         list_add(&exp->exp_stale_list,
1921                                  &obd_stale_exports);
1922
1923                 spin_unlock(&obd_stale_export_lock);
1924                 spin_unlock_bh(&exp->exp_bl_list_lock);
1925         } else {
1926                 class_export_put(exp);
1927         }
1928         EXIT;
1929 }
1930 EXPORT_SYMBOL(obd_stale_export_put);
1931
1932 /**
1933  * Adjust the position of the export in the stale list,
1934  * i.e. move to the head of the list if is needed.
1935  **/
1936 void obd_stale_export_adjust(struct obd_export *exp)
1937 {
1938         LASSERT(exp != NULL);
1939         spin_lock_bh(&exp->exp_bl_list_lock);
1940         spin_lock(&obd_stale_export_lock);
1941
1942         if (!list_empty(&exp->exp_stale_list) &&
1943             !list_empty(&exp->exp_bl_list))
1944                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1945
1946         spin_unlock(&obd_stale_export_lock);
1947         spin_unlock_bh(&exp->exp_bl_list_lock);
1948 }
1949 EXPORT_SYMBOL(obd_stale_export_adjust);
1950
1951 /**
1952  * start destroy zombie import/export thread
1953  */
1954 int obd_zombie_impexp_init(void)
1955 {
1956         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1957         if (!zombie_wq)
1958                 return -ENOMEM;
1959
1960         return 0;
1961 }
1962
1963 /**
1964  * stop destroy zombie import/export thread
1965  */
1966 void obd_zombie_impexp_stop(void)
1967 {
1968         destroy_workqueue(zombie_wq);
1969         LASSERT(list_empty(&obd_stale_exports));
1970 }
1971
1972 /***** Kernel-userspace comm helpers *******/
1973
1974 /* Get length of entire message, including header */
1975 int kuc_len(int payload_len)
1976 {
1977         return sizeof(struct kuc_hdr) + payload_len;
1978 }
1979 EXPORT_SYMBOL(kuc_len);
1980
1981 /* Get a pointer to kuc header, given a ptr to the payload
1982  * @param p Pointer to payload area
1983  * @returns Pointer to kuc header
1984  */
1985 struct kuc_hdr * kuc_ptr(void *p)
1986 {
1987         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1988         LASSERT(lh->kuc_magic == KUC_MAGIC);
1989         return lh;
1990 }
1991 EXPORT_SYMBOL(kuc_ptr);
1992
1993 /* Alloc space for a message, and fill in header
1994  * @return Pointer to payload area
1995  */
1996 void *kuc_alloc(int payload_len, int transport, int type)
1997 {
1998         struct kuc_hdr *lh;
1999         int len = kuc_len(payload_len);
2000
2001         OBD_ALLOC(lh, len);
2002         if (lh == NULL)
2003                 return ERR_PTR(-ENOMEM);
2004
2005         lh->kuc_magic = KUC_MAGIC;
2006         lh->kuc_transport = transport;
2007         lh->kuc_msgtype = type;
2008         lh->kuc_msglen = len;
2009
2010         return (void *)(lh + 1);
2011 }
2012 EXPORT_SYMBOL(kuc_alloc);
2013
2014 /* Takes pointer to payload area */
2015 void kuc_free(void *p, int payload_len)
2016 {
2017         struct kuc_hdr *lh = kuc_ptr(p);
2018         OBD_FREE(lh, kuc_len(payload_len));
2019 }
2020 EXPORT_SYMBOL(kuc_free);
2021
2022 struct obd_request_slot_waiter {
2023         struct list_head        orsw_entry;
2024         wait_queue_head_t       orsw_waitq;
2025         bool                    orsw_signaled;
2026 };
2027
2028 static bool obd_request_slot_avail(struct client_obd *cli,
2029                                    struct obd_request_slot_waiter *orsw)
2030 {
2031         bool avail;
2032
2033         spin_lock(&cli->cl_loi_list_lock);
2034         avail = !!list_empty(&orsw->orsw_entry);
2035         spin_unlock(&cli->cl_loi_list_lock);
2036
2037         return avail;
2038 };
2039
2040 /*
2041  * For network flow control, the RPC sponsor needs to acquire a credit
2042  * before sending the RPC. The credits count for a connection is defined
2043  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2044  * the subsequent RPC sponsors need to wait until others released their
2045  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2046  */
2047 int obd_get_request_slot(struct client_obd *cli)
2048 {
2049         struct obd_request_slot_waiter   orsw;
2050         struct l_wait_info               lwi;
2051         int                              rc;
2052
2053         spin_lock(&cli->cl_loi_list_lock);
2054         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2055                 cli->cl_rpcs_in_flight++;
2056                 spin_unlock(&cli->cl_loi_list_lock);
2057                 return 0;
2058         }
2059
2060         init_waitqueue_head(&orsw.orsw_waitq);
2061         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2062         orsw.orsw_signaled = false;
2063         spin_unlock(&cli->cl_loi_list_lock);
2064
2065         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2066         rc = l_wait_event(orsw.orsw_waitq,
2067                           obd_request_slot_avail(cli, &orsw) ||
2068                           orsw.orsw_signaled,
2069                           &lwi);
2070
2071         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2072          * freed but other (such as obd_put_request_slot) is using it. */
2073         spin_lock(&cli->cl_loi_list_lock);
2074         if (rc != 0) {
2075                 if (!orsw.orsw_signaled) {
2076                         if (list_empty(&orsw.orsw_entry))
2077                                 cli->cl_rpcs_in_flight--;
2078                         else
2079                                 list_del(&orsw.orsw_entry);
2080                 }
2081         }
2082
2083         if (orsw.orsw_signaled) {
2084                 LASSERT(list_empty(&orsw.orsw_entry));
2085
2086                 rc = -EINTR;
2087         }
2088         spin_unlock(&cli->cl_loi_list_lock);
2089
2090         return rc;
2091 }
2092 EXPORT_SYMBOL(obd_get_request_slot);
2093
2094 void obd_put_request_slot(struct client_obd *cli)
2095 {
2096         struct obd_request_slot_waiter *orsw;
2097
2098         spin_lock(&cli->cl_loi_list_lock);
2099         cli->cl_rpcs_in_flight--;
2100
2101         /* If there is free slot, wakeup the first waiter. */
2102         if (!list_empty(&cli->cl_flight_waiters) &&
2103             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2104                 orsw = list_entry(cli->cl_flight_waiters.next,
2105                                   struct obd_request_slot_waiter, orsw_entry);
2106                 list_del_init(&orsw->orsw_entry);
2107                 cli->cl_rpcs_in_flight++;
2108                 wake_up(&orsw->orsw_waitq);
2109         }
2110         spin_unlock(&cli->cl_loi_list_lock);
2111 }
2112 EXPORT_SYMBOL(obd_put_request_slot);
2113
2114 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2115 {
2116         return cli->cl_max_rpcs_in_flight;
2117 }
2118 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2119
2120 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2121 {
2122         struct obd_request_slot_waiter *orsw;
2123         __u32                           old;
2124         int                             diff;
2125         int                             i;
2126         char                            *typ_name;
2127         int                             rc;
2128
2129         if (max > OBD_MAX_RIF_MAX || max < 1)
2130                 return -ERANGE;
2131
2132         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2133         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2134                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2135                  * strictly lower that max_rpcs_in_flight */
2136                 if (max < 2) {
2137                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2138                                "because it must be higher than "
2139                                "max_mod_rpcs_in_flight value",
2140                                cli->cl_import->imp_obd->obd_name);
2141                         return -ERANGE;
2142                 }
2143                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2144                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2145                         if (rc != 0)
2146                                 return rc;
2147                 }
2148         }
2149
2150         spin_lock(&cli->cl_loi_list_lock);
2151         old = cli->cl_max_rpcs_in_flight;
2152         cli->cl_max_rpcs_in_flight = max;
2153         client_adjust_max_dirty(cli);
2154
2155         diff = max - old;
2156
2157         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2158         for (i = 0; i < diff; i++) {
2159                 if (list_empty(&cli->cl_flight_waiters))
2160                         break;
2161
2162                 orsw = list_entry(cli->cl_flight_waiters.next,
2163                                   struct obd_request_slot_waiter, orsw_entry);
2164                 list_del_init(&orsw->orsw_entry);
2165                 cli->cl_rpcs_in_flight++;
2166                 wake_up(&orsw->orsw_waitq);
2167         }
2168         spin_unlock(&cli->cl_loi_list_lock);
2169
2170         return 0;
2171 }
2172 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2173
2174 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2175 {
2176         return cli->cl_max_mod_rpcs_in_flight;
2177 }
2178 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2179
2180 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2181 {
2182         struct obd_connect_data *ocd;
2183         __u16 maxmodrpcs;
2184         __u16 prev;
2185
2186         if (max > OBD_MAX_RIF_MAX || max < 1)
2187                 return -ERANGE;
2188
2189         /* cannot exceed or equal max_rpcs_in_flight */
2190         if (max >= cli->cl_max_rpcs_in_flight) {
2191                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2192                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2193                        cli->cl_import->imp_obd->obd_name,
2194                        max, cli->cl_max_rpcs_in_flight);
2195                 return -ERANGE;
2196         }
2197
2198         /* cannot exceed max modify RPCs in flight supported by the server */
2199         ocd = &cli->cl_import->imp_connect_data;
2200         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2201                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2202         else
2203                 maxmodrpcs = 1;
2204         if (max > maxmodrpcs) {
2205                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2206                        "higher than max_mod_rpcs_per_client value (%hu) "
2207                        "returned by the server at connection\n",
2208                        cli->cl_import->imp_obd->obd_name,
2209                        max, maxmodrpcs);
2210                 return -ERANGE;
2211         }
2212
2213         spin_lock(&cli->cl_mod_rpcs_lock);
2214
2215         prev = cli->cl_max_mod_rpcs_in_flight;
2216         cli->cl_max_mod_rpcs_in_flight = max;
2217
2218         /* wakeup waiters if limit has been increased */
2219         if (cli->cl_max_mod_rpcs_in_flight > prev)
2220                 wake_up(&cli->cl_mod_rpcs_waitq);
2221
2222         spin_unlock(&cli->cl_mod_rpcs_lock);
2223
2224         return 0;
2225 }
2226 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2227
2228 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2229                                struct seq_file *seq)
2230 {
2231         unsigned long mod_tot = 0, mod_cum;
2232         struct timespec64 now;
2233         int i;
2234
2235         ktime_get_real_ts64(&now);
2236
2237         spin_lock(&cli->cl_mod_rpcs_lock);
2238
2239         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2240                    (s64)now.tv_sec, now.tv_nsec);
2241         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2242                    cli->cl_mod_rpcs_in_flight);
2243
2244         seq_printf(seq, "\n\t\t\tmodify\n");
2245         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2246
2247         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2248
2249         mod_cum = 0;
2250         for (i = 0; i < OBD_HIST_MAX; i++) {
2251                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2252                 mod_cum += mod;
2253                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2254                            i, mod, pct(mod, mod_tot),
2255                            pct(mod_cum, mod_tot));
2256                 if (mod_cum == mod_tot)
2257                         break;
2258         }
2259
2260         spin_unlock(&cli->cl_mod_rpcs_lock);
2261
2262         return 0;
2263 }
2264 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2265
2266 /* The number of modify RPCs sent in parallel is limited
2267  * because the server has a finite number of slots per client to
2268  * store request result and ensure reply reconstruction when needed.
2269  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2270  * that takes into account server limit and cl_max_rpcs_in_flight
2271  * value.
2272  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2273  * one close request is allowed above the maximum.
2274  */
2275 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2276                                                  bool close_req)
2277 {
2278         bool avail;
2279
2280         /* A slot is available if
2281          * - number of modify RPCs in flight is less than the max
2282          * - it's a close RPC and no other close request is in flight
2283          */
2284         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2285                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2286
2287         return avail;
2288 }
2289
2290 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2291                                          bool close_req)
2292 {
2293         bool avail;
2294
2295         spin_lock(&cli->cl_mod_rpcs_lock);
2296         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2297         spin_unlock(&cli->cl_mod_rpcs_lock);
2298         return avail;
2299 }
2300
2301 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2302 {
2303         if (it != NULL &&
2304             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2305              it->it_op == IT_READDIR ||
2306              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2307                         return true;
2308         return false;
2309 }
2310
2311 /* Get a modify RPC slot from the obd client @cli according
2312  * to the kind of operation @opc that is going to be sent
2313  * and the intent @it of the operation if it applies.
2314  * If the maximum number of modify RPCs in flight is reached
2315  * the thread is put to sleep.
2316  * Returns the tag to be set in the request message. Tag 0
2317  * is reserved for non-modifying requests.
2318  */
2319 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2320                            struct lookup_intent *it)
2321 {
2322         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2323         bool                    close_req = false;
2324         __u16                   i, max;
2325
2326         /* read-only metadata RPCs don't consume a slot on MDT
2327          * for reply reconstruction
2328          */
2329         if (obd_skip_mod_rpc_slot(it))
2330                 return 0;
2331
2332         if (opc == MDS_CLOSE)
2333                 close_req = true;
2334
2335         do {
2336                 spin_lock(&cli->cl_mod_rpcs_lock);
2337                 max = cli->cl_max_mod_rpcs_in_flight;
2338                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2339                         /* there is a slot available */
2340                         cli->cl_mod_rpcs_in_flight++;
2341                         if (close_req)
2342                                 cli->cl_close_rpcs_in_flight++;
2343                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2344                                          cli->cl_mod_rpcs_in_flight);
2345                         /* find a free tag */
2346                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2347                                                 max + 1);
2348                         LASSERT(i < OBD_MAX_RIF_MAX);
2349                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2350                         spin_unlock(&cli->cl_mod_rpcs_lock);
2351                         /* tag 0 is reserved for non-modify RPCs */
2352                         return i + 1;
2353                 }
2354                 spin_unlock(&cli->cl_mod_rpcs_lock);
2355
2356                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2357                        "opc %u, max %hu\n",
2358                        cli->cl_import->imp_obd->obd_name, opc, max);
2359
2360                 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2361                                        obd_mod_rpc_slot_avail(cli, close_req),
2362                                        &lwi);
2363         } while (true);
2364 }
2365 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2366
2367 /* Put a modify RPC slot from the obd client @cli according
2368  * to the kind of operation @opc that has been sent and the
2369  * intent @it of the operation if it applies.
2370  */
2371 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2372                           struct lookup_intent *it, __u16 tag)
2373 {
2374         bool                    close_req = false;
2375
2376         if (obd_skip_mod_rpc_slot(it))
2377                 return;
2378
2379         if (opc == MDS_CLOSE)
2380                 close_req = true;
2381
2382         spin_lock(&cli->cl_mod_rpcs_lock);
2383         cli->cl_mod_rpcs_in_flight--;
2384         if (close_req)
2385                 cli->cl_close_rpcs_in_flight--;
2386         /* release the tag in the bitmap */
2387         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2388         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2389         spin_unlock(&cli->cl_mod_rpcs_lock);
2390         wake_up(&cli->cl_mod_rpcs_waitq);
2391 }
2392 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2393