Whamcloud - gitweb
4923838645928ed1c67a76391e7a5bec1a474801
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55
56 static struct workqueue_struct *zombie_wq;
57
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks, int debug_level);
62
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165 #ifdef HAVE_SERVER_SUPPORT
166         if (type->typ_sym_filter)
167                 type->typ_debugfs_entry = NULL;
168 #endif
169         debugfs_remove_recursive(type->typ_debugfs_entry);
170         type->typ_debugfs_entry = NULL;
171
172         if (type->typ_lu)
173                 lu_device_type_fini(type->typ_lu);
174
175         spin_lock(&obd_types_lock);
176         list_del(&type->typ_chain);
177         spin_unlock(&obd_types_lock);
178
179 #ifdef CONFIG_PROC_FS
180         if (type->typ_name && type->typ_procroot)
181                 remove_proc_subtree(type->typ_name, proc_lustre_root);
182 #endif
183         if (type->typ_md_ops)
184                 OBD_FREE_PTR(type->typ_md_ops);
185         if (type->typ_dt_ops)
186                 OBD_FREE_PTR(type->typ_dt_ops);
187
188         OBD_FREE(type, sizeof(*type));
189 }
190
191 static struct kobj_type class_ktype = {
192         .sysfs_ops      = &lustre_sysfs_ops,
193         .release        = class_sysfs_release,
194 };
195
196 #ifdef HAVE_SERVER_SUPPORT
197 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
198 {
199         struct dentry *symlink;
200         struct obd_type *type;
201         struct kobject *kobj;
202         int rc;
203
204         kobj = kset_find_obj(lustre_kset, name);
205         if (kobj) {
206                 kobject_put(kobj);
207                 return ERR_PTR(-EEXIST);
208         }
209
210         OBD_ALLOC(type, sizeof(*type));
211         if (!type)
212                 return ERR_PTR(-ENOMEM);
213
214         INIT_LIST_HEAD(&type->typ_chain);
215
216         type->typ_kobj.kset = lustre_kset;
217         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
218                                   &lustre_kset->kobj, "%s", name);
219         if (rc)
220                 return ERR_PTR(rc);
221
222         symlink = debugfs_create_dir(name, debugfs_lustre_root);
223         if (IS_ERR_OR_NULL(symlink)) {
224                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
225                 kobject_put(&type->typ_kobj);
226                 return ERR_PTR(rc);
227         }
228         type->typ_debugfs_entry = symlink;
229         type->typ_sym_filter = true;
230
231         if (enable_proc) {
232                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
233                                                       NULL, NULL);
234                 if (IS_ERR(type->typ_procroot)) {
235                         CERROR("%s: can't create compat proc entry: %d\n",
236                                name, (int)PTR_ERR(type->typ_procroot));
237                         type->typ_procroot = NULL;
238                 }
239         }
240
241         return type;
242 }
243 EXPORT_SYMBOL(class_add_symlinks);
244 #endif /* HAVE_SERVER_SUPPORT */
245
246 #define CLASS_MAX_NAME 1024
247
248 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
249                         bool enable_proc, struct lprocfs_vars *vars,
250                         const char *name, struct lu_device_type *ldt)
251 {
252         struct obd_type *type;
253         int rc;
254
255         ENTRY;
256         /* sanity check */
257         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
258
259         if (class_search_type(name)) {
260 #ifdef HAVE_SERVER_SUPPORT
261                 if (strcmp(name, LUSTRE_LOV_NAME) == 0 ||
262                     strcmp(name, LUSTRE_OSC_NAME) == 0) {
263                         struct kobject *kobj;
264
265                         kobj = kset_find_obj(lustre_kset, name);
266                         if (kobj) {
267                                 type = container_of(kobj, struct obd_type,
268                                                     typ_kobj);
269                                 goto dir_exist;
270                         }
271                 }
272 #endif /* HAVE_SERVER_SUPPORT */
273                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
274                 RETURN(-EEXIST);
275         }
276
277         OBD_ALLOC(type, sizeof(*type));
278         if (type == NULL)
279                 RETURN(-ENOMEM);
280
281         INIT_LIST_HEAD(&type->typ_chain);
282         type->typ_kobj.kset = lustre_kset;
283         kobject_init(&type->typ_kobj, &class_ktype);
284 #ifdef HAVE_SERVER_SUPPORT
285 dir_exist:
286 #endif /* HAVE_SERVER_SUPPORT */
287         OBD_ALLOC_PTR(type->typ_dt_ops);
288         OBD_ALLOC_PTR(type->typ_md_ops);
289
290         if (type->typ_dt_ops == NULL ||
291             type->typ_md_ops == NULL)
292                 GOTO (failed, rc = -ENOMEM);
293
294         *(type->typ_dt_ops) = *dt_ops;
295         /* md_ops is optional */
296         if (md_ops)
297                 *(type->typ_md_ops) = *md_ops;
298         spin_lock_init(&type->obd_type_lock);
299
300 #ifdef HAVE_SERVER_SUPPORT
301         if (type->typ_sym_filter)
302                 goto setup_ldt;
303 #endif
304 #ifdef CONFIG_PROC_FS
305         if (enable_proc && !type->typ_procroot) {
306                 type->typ_procroot = lprocfs_register(name,
307                                                       proc_lustre_root,
308                                                       NULL, type);
309                 if (IS_ERR(type->typ_procroot)) {
310                         rc = PTR_ERR(type->typ_procroot);
311                         type->typ_procroot = NULL;
312                         GOTO(failed, rc);
313                 }
314         }
315 #endif
316         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
317                                                     vars, type);
318         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
319                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
320                                              : -ENOMEM;
321                 type->typ_debugfs_entry = NULL;
322                 GOTO(failed, rc);
323         }
324
325         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
326         if (rc)
327                 GOTO(failed, rc);
328 #ifdef HAVE_SERVER_SUPPORT
329 setup_ldt:
330 #endif
331         if (ldt) {
332                 type->typ_lu = ldt;
333                 rc = lu_device_type_init(ldt);
334                 if (rc)
335                         GOTO(failed, rc);
336         }
337
338         spin_lock(&obd_types_lock);
339         list_add(&type->typ_chain, &obd_types);
340         spin_unlock(&obd_types_lock);
341
342         RETURN(0);
343
344 failed:
345         kobject_put(&type->typ_kobj);
346
347         RETURN(rc);
348 }
349 EXPORT_SYMBOL(class_register_type);
350
351 int class_unregister_type(const char *name)
352 {
353         struct obd_type *type = class_search_type(name);
354         ENTRY;
355
356         if (!type) {
357                 CERROR("unknown obd type\n");
358                 RETURN(-EINVAL);
359         }
360
361         if (type->typ_refcnt) {
362                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
363                 /* This is a bad situation, let's make the best of it */
364                 /* Remove ops, but leave the name for debugging */
365                 OBD_FREE_PTR(type->typ_dt_ops);
366                 OBD_FREE_PTR(type->typ_md_ops);
367                 RETURN(-EBUSY);
368         }
369
370         kobject_put(&type->typ_kobj);
371
372         RETURN(0);
373 } /* class_unregister_type */
374 EXPORT_SYMBOL(class_unregister_type);
375
376 /**
377  * Create a new obd device.
378  *
379  * Allocate the new obd_device and initialize it.
380  *
381  * \param[in] type_name obd device type string.
382  * \param[in] name      obd device name.
383  * \param[in] uuid      obd device UUID
384  *
385  * \retval newdev         pointer to created obd_device
386  * \retval ERR_PTR(errno) on error
387  */
388 struct obd_device *class_newdev(const char *type_name, const char *name,
389                                 const char *uuid)
390 {
391         struct obd_device *newdev;
392         struct obd_type *type = NULL;
393         ENTRY;
394
395         if (strlen(name) >= MAX_OBD_NAME) {
396                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
397                 RETURN(ERR_PTR(-EINVAL));
398         }
399
400         type = class_get_type(type_name);
401         if (type == NULL){
402                 CERROR("OBD: unknown type: %s\n", type_name);
403                 RETURN(ERR_PTR(-ENODEV));
404         }
405
406         newdev = obd_device_alloc();
407         if (newdev == NULL) {
408                 class_put_type(type);
409                 RETURN(ERR_PTR(-ENOMEM));
410         }
411         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
412         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
413         newdev->obd_type = type;
414         newdev->obd_minor = -1;
415
416         rwlock_init(&newdev->obd_pool_lock);
417         newdev->obd_pool_limit = 0;
418         newdev->obd_pool_slv = 0;
419
420         INIT_LIST_HEAD(&newdev->obd_exports);
421         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
422         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
423         INIT_LIST_HEAD(&newdev->obd_exports_timed);
424         INIT_LIST_HEAD(&newdev->obd_nid_stats);
425         spin_lock_init(&newdev->obd_nid_lock);
426         spin_lock_init(&newdev->obd_dev_lock);
427         mutex_init(&newdev->obd_dev_mutex);
428         spin_lock_init(&newdev->obd_osfs_lock);
429         /* newdev->obd_osfs_age must be set to a value in the distant
430          * past to guarantee a fresh statfs is fetched on mount. */
431         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
432
433         /* XXX belongs in setup not attach  */
434         init_rwsem(&newdev->obd_observer_link_sem);
435         /* recovery data */
436         spin_lock_init(&newdev->obd_recovery_task_lock);
437         init_waitqueue_head(&newdev->obd_next_transno_waitq);
438         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
439         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
440         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
441         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
442         INIT_LIST_HEAD(&newdev->obd_evict_list);
443         INIT_LIST_HEAD(&newdev->obd_lwp_list);
444
445         llog_group_init(&newdev->obd_olg);
446         /* Detach drops this */
447         atomic_set(&newdev->obd_refcount, 1);
448         lu_ref_init(&newdev->obd_reference);
449         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
450
451         newdev->obd_conn_inprogress = 0;
452
453         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
454
455         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
456                newdev->obd_name, newdev);
457
458         return newdev;
459 }
460
461 /**
462  * Free obd device.
463  *
464  * \param[in] obd obd_device to be freed
465  *
466  * \retval none
467  */
468 void class_free_dev(struct obd_device *obd)
469 {
470         struct obd_type *obd_type = obd->obd_type;
471
472         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
473                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
474         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
475                  "obd %p != obd_devs[%d] %p\n",
476                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
477         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
478                  "obd_refcount should be 0, not %d\n",
479                  atomic_read(&obd->obd_refcount));
480         LASSERT(obd_type != NULL);
481
482         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
483                obd->obd_name, obd->obd_type->typ_name);
484
485         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
486                          obd->obd_name, obd->obd_uuid.uuid);
487         if (obd->obd_stopping) {
488                 int err;
489
490                 /* If we're not stopping, we were never set up */
491                 err = obd_cleanup(obd);
492                 if (err)
493                         CERROR("Cleanup %s returned %d\n",
494                                 obd->obd_name, err);
495         }
496
497         obd_device_free(obd);
498
499         class_put_type(obd_type);
500 }
501
502 /**
503  * Unregister obd device.
504  *
505  * Free slot in obd_dev[] used by \a obd.
506  *
507  * \param[in] new_obd obd_device to be unregistered
508  *
509  * \retval none
510  */
511 void class_unregister_device(struct obd_device *obd)
512 {
513         write_lock(&obd_dev_lock);
514         if (obd->obd_minor >= 0) {
515                 LASSERT(obd_devs[obd->obd_minor] == obd);
516                 obd_devs[obd->obd_minor] = NULL;
517                 obd->obd_minor = -1;
518         }
519         write_unlock(&obd_dev_lock);
520 }
521
522 /**
523  * Register obd device.
524  *
525  * Find free slot in obd_devs[], fills it with \a new_obd.
526  *
527  * \param[in] new_obd obd_device to be registered
528  *
529  * \retval 0          success
530  * \retval -EEXIST    device with this name is registered
531  * \retval -EOVERFLOW obd_devs[] is full
532  */
533 int class_register_device(struct obd_device *new_obd)
534 {
535         int ret = 0;
536         int i;
537         int new_obd_minor = 0;
538         bool minor_assign = false;
539         bool retried = false;
540
541 again:
542         write_lock(&obd_dev_lock);
543         for (i = 0; i < class_devno_max(); i++) {
544                 struct obd_device *obd = class_num2obd(i);
545
546                 if (obd != NULL &&
547                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
548
549                         if (!retried) {
550                                 write_unlock(&obd_dev_lock);
551
552                                 /* the obd_device could be waited to be
553                                  * destroyed by the "obd_zombie_impexp_thread".
554                                  */
555                                 obd_zombie_barrier();
556                                 retried = true;
557                                 goto again;
558                         }
559
560                         CERROR("%s: already exists, won't add\n",
561                                obd->obd_name);
562                         /* in case we found a free slot before duplicate */
563                         minor_assign = false;
564                         ret = -EEXIST;
565                         break;
566                 }
567                 if (!minor_assign && obd == NULL) {
568                         new_obd_minor = i;
569                         minor_assign = true;
570                 }
571         }
572
573         if (minor_assign) {
574                 new_obd->obd_minor = new_obd_minor;
575                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
576                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
577                 obd_devs[new_obd_minor] = new_obd;
578         } else {
579                 if (ret == 0) {
580                         ret = -EOVERFLOW;
581                         CERROR("%s: all %u/%u devices used, increase "
582                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
583                                i, class_devno_max(), ret);
584                 }
585         }
586         write_unlock(&obd_dev_lock);
587
588         RETURN(ret);
589 }
590
591 static int class_name2dev_nolock(const char *name)
592 {
593         int i;
594
595         if (!name)
596                 return -1;
597
598         for (i = 0; i < class_devno_max(); i++) {
599                 struct obd_device *obd = class_num2obd(i);
600
601                 if (obd && strcmp(name, obd->obd_name) == 0) {
602                         /* Make sure we finished attaching before we give
603                            out any references */
604                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
605                         if (obd->obd_attached) {
606                                 return i;
607                         }
608                         break;
609                 }
610         }
611
612         return -1;
613 }
614
615 int class_name2dev(const char *name)
616 {
617         int i;
618
619         if (!name)
620                 return -1;
621
622         read_lock(&obd_dev_lock);
623         i = class_name2dev_nolock(name);
624         read_unlock(&obd_dev_lock);
625
626         return i;
627 }
628 EXPORT_SYMBOL(class_name2dev);
629
630 struct obd_device *class_name2obd(const char *name)
631 {
632         int dev = class_name2dev(name);
633
634         if (dev < 0 || dev > class_devno_max())
635                 return NULL;
636         return class_num2obd(dev);
637 }
638 EXPORT_SYMBOL(class_name2obd);
639
640 int class_uuid2dev_nolock(struct obd_uuid *uuid)
641 {
642         int i;
643
644         for (i = 0; i < class_devno_max(); i++) {
645                 struct obd_device *obd = class_num2obd(i);
646
647                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
648                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
649                         return i;
650                 }
651         }
652
653         return -1;
654 }
655
656 int class_uuid2dev(struct obd_uuid *uuid)
657 {
658         int i;
659
660         read_lock(&obd_dev_lock);
661         i = class_uuid2dev_nolock(uuid);
662         read_unlock(&obd_dev_lock);
663
664         return i;
665 }
666 EXPORT_SYMBOL(class_uuid2dev);
667
668 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
669 {
670         int dev = class_uuid2dev(uuid);
671         if (dev < 0)
672                 return NULL;
673         return class_num2obd(dev);
674 }
675 EXPORT_SYMBOL(class_uuid2obd);
676
677 /**
678  * Get obd device from ::obd_devs[]
679  *
680  * \param num [in] array index
681  *
682  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
683  *         otherwise return the obd device there.
684  */
685 struct obd_device *class_num2obd(int num)
686 {
687         struct obd_device *obd = NULL;
688
689         if (num < class_devno_max()) {
690                 obd = obd_devs[num];
691                 if (obd == NULL)
692                         return NULL;
693
694                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
695                          "%p obd_magic %08x != %08x\n",
696                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
697                 LASSERTF(obd->obd_minor == num,
698                          "%p obd_minor %0d != %0d\n",
699                          obd, obd->obd_minor, num);
700         }
701
702         return obd;
703 }
704
705 /**
706  * Find obd in obd_dev[] by name or uuid.
707  *
708  * Increment obd's refcount if found.
709  *
710  * \param[in] str obd name or uuid
711  *
712  * \retval NULL    if not found
713  * \retval target  pointer to found obd_device
714  */
715 struct obd_device *class_dev_by_str(const char *str)
716 {
717         struct obd_device *target = NULL;
718         struct obd_uuid tgtuuid;
719         int rc;
720
721         obd_str2uuid(&tgtuuid, str);
722
723         read_lock(&obd_dev_lock);
724         rc = class_uuid2dev_nolock(&tgtuuid);
725         if (rc < 0)
726                 rc = class_name2dev_nolock(str);
727
728         if (rc >= 0)
729                 target = class_num2obd(rc);
730
731         if (target != NULL)
732                 class_incref(target, "find", current);
733         read_unlock(&obd_dev_lock);
734
735         RETURN(target);
736 }
737 EXPORT_SYMBOL(class_dev_by_str);
738
739 /**
740  * Get obd devices count. Device in any
741  *    state are counted
742  * \retval obd device count
743  */
744 int get_devices_count(void)
745 {
746         int index, max_index = class_devno_max(), dev_count = 0;
747
748         read_lock(&obd_dev_lock);
749         for (index = 0; index <= max_index; index++) {
750                 struct obd_device *obd = class_num2obd(index);
751                 if (obd != NULL)
752                         dev_count++;
753         }
754         read_unlock(&obd_dev_lock);
755
756         return dev_count;
757 }
758 EXPORT_SYMBOL(get_devices_count);
759
760 void class_obd_list(void)
761 {
762         char *status;
763         int i;
764
765         read_lock(&obd_dev_lock);
766         for (i = 0; i < class_devno_max(); i++) {
767                 struct obd_device *obd = class_num2obd(i);
768
769                 if (obd == NULL)
770                         continue;
771                 if (obd->obd_stopping)
772                         status = "ST";
773                 else if (obd->obd_set_up)
774                         status = "UP";
775                 else if (obd->obd_attached)
776                         status = "AT";
777                 else
778                         status = "--";
779                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
780                          i, status, obd->obd_type->typ_name,
781                          obd->obd_name, obd->obd_uuid.uuid,
782                          atomic_read(&obd->obd_refcount));
783         }
784         read_unlock(&obd_dev_lock);
785         return;
786 }
787
788 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
789    specified, then only the client with that uuid is returned,
790    otherwise any client connected to the tgt is returned. */
791 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
792                                           const char *type_name,
793                                           struct obd_uuid *grp_uuid)
794 {
795         int i;
796
797         read_lock(&obd_dev_lock);
798         for (i = 0; i < class_devno_max(); i++) {
799                 struct obd_device *obd = class_num2obd(i);
800
801                 if (obd == NULL)
802                         continue;
803                 if ((strncmp(obd->obd_type->typ_name, type_name,
804                              strlen(type_name)) == 0)) {
805                         if (obd_uuid_equals(tgt_uuid,
806                                             &obd->u.cli.cl_target_uuid) &&
807                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
808                                                          &obd->obd_uuid) : 1)) {
809                                 read_unlock(&obd_dev_lock);
810                                 return obd;
811                         }
812                 }
813         }
814         read_unlock(&obd_dev_lock);
815
816         return NULL;
817 }
818 EXPORT_SYMBOL(class_find_client_obd);
819
820 /* Iterate the obd_device list looking devices have grp_uuid. Start
821    searching at *next, and if a device is found, the next index to look
822    at is saved in *next. If next is NULL, then the first matching device
823    will always be returned. */
824 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
825 {
826         int i;
827
828         if (next == NULL)
829                 i = 0;
830         else if (*next >= 0 && *next < class_devno_max())
831                 i = *next;
832         else
833                 return NULL;
834
835         read_lock(&obd_dev_lock);
836         for (; i < class_devno_max(); i++) {
837                 struct obd_device *obd = class_num2obd(i);
838
839                 if (obd == NULL)
840                         continue;
841                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
842                         if (next != NULL)
843                                 *next = i+1;
844                         read_unlock(&obd_dev_lock);
845                         return obd;
846                 }
847         }
848         read_unlock(&obd_dev_lock);
849
850         return NULL;
851 }
852 EXPORT_SYMBOL(class_devices_in_group);
853
854 /**
855  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
856  * adjust sptlrpc settings accordingly.
857  */
858 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
859 {
860         struct obd_device  *obd;
861         const char         *type;
862         int                 i, rc = 0, rc2;
863
864         LASSERT(namelen > 0);
865
866         read_lock(&obd_dev_lock);
867         for (i = 0; i < class_devno_max(); i++) {
868                 obd = class_num2obd(i);
869
870                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
871                         continue;
872
873                 /* only notify mdc, osc, osp, lwp, mdt, ost
874                  * because only these have a -sptlrpc llog */
875                 type = obd->obd_type->typ_name;
876                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
877                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
878                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
879                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
880                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
881                     strcmp(type, LUSTRE_OST_NAME) != 0)
882                         continue;
883
884                 if (strncmp(obd->obd_name, fsname, namelen))
885                         continue;
886
887                 class_incref(obd, __FUNCTION__, obd);
888                 read_unlock(&obd_dev_lock);
889                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
890                                          sizeof(KEY_SPTLRPC_CONF),
891                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
892                 rc = rc ? rc : rc2;
893                 class_decref(obd, __FUNCTION__, obd);
894                 read_lock(&obd_dev_lock);
895         }
896         read_unlock(&obd_dev_lock);
897         return rc;
898 }
899 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
900
901 void obd_cleanup_caches(void)
902 {
903         ENTRY;
904         if (obd_device_cachep) {
905                 kmem_cache_destroy(obd_device_cachep);
906                 obd_device_cachep = NULL;
907         }
908
909         EXIT;
910 }
911
912 int obd_init_caches(void)
913 {
914         int rc;
915         ENTRY;
916
917         LASSERT(obd_device_cachep == NULL);
918         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
919                                               sizeof(struct obd_device),
920                                               0, 0, NULL);
921         if (!obd_device_cachep)
922                 GOTO(out, rc = -ENOMEM);
923
924         RETURN(0);
925 out:
926         obd_cleanup_caches();
927         RETURN(rc);
928 }
929
930 /* map connection to client */
931 struct obd_export *class_conn2export(struct lustre_handle *conn)
932 {
933         struct obd_export *export;
934         ENTRY;
935
936         if (!conn) {
937                 CDEBUG(D_CACHE, "looking for null handle\n");
938                 RETURN(NULL);
939         }
940
941         if (conn->cookie == -1) {  /* this means assign a new connection */
942                 CDEBUG(D_CACHE, "want a new connection\n");
943                 RETURN(NULL);
944         }
945
946         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
947         export = class_handle2object(conn->cookie, NULL);
948         RETURN(export);
949 }
950 EXPORT_SYMBOL(class_conn2export);
951
952 struct obd_device *class_exp2obd(struct obd_export *exp)
953 {
954         if (exp)
955                 return exp->exp_obd;
956         return NULL;
957 }
958 EXPORT_SYMBOL(class_exp2obd);
959
960 struct obd_import *class_exp2cliimp(struct obd_export *exp)
961 {
962         struct obd_device *obd = exp->exp_obd;
963         if (obd == NULL)
964                 return NULL;
965         return obd->u.cli.cl_import;
966 }
967 EXPORT_SYMBOL(class_exp2cliimp);
968
969 /* Export management functions */
970 static void class_export_destroy(struct obd_export *exp)
971 {
972         struct obd_device *obd = exp->exp_obd;
973         ENTRY;
974
975         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
976         LASSERT(obd != NULL);
977
978         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
979                exp->exp_client_uuid.uuid, obd->obd_name);
980
981         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
982         if (exp->exp_connection)
983                 ptlrpc_put_connection_superhack(exp->exp_connection);
984
985         LASSERT(list_empty(&exp->exp_outstanding_replies));
986         LASSERT(list_empty(&exp->exp_uncommitted_replies));
987         LASSERT(list_empty(&exp->exp_req_replay_queue));
988         LASSERT(list_empty(&exp->exp_hp_rpcs));
989         obd_destroy_export(exp);
990         /* self export doesn't hold a reference to an obd, although it
991          * exists until freeing of the obd */
992         if (exp != obd->obd_self_export)
993                 class_decref(obd, "export", exp);
994
995         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
996         EXIT;
997 }
998
999 static void export_handle_addref(void *export)
1000 {
1001         class_export_get(export);
1002 }
1003
1004 static struct portals_handle_ops export_handle_ops = {
1005         .hop_addref = export_handle_addref,
1006         .hop_free   = NULL,
1007 };
1008
1009 struct obd_export *class_export_get(struct obd_export *exp)
1010 {
1011         atomic_inc(&exp->exp_refcount);
1012         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1013                atomic_read(&exp->exp_refcount));
1014         return exp;
1015 }
1016 EXPORT_SYMBOL(class_export_get);
1017
1018 void class_export_put(struct obd_export *exp)
1019 {
1020         LASSERT(exp != NULL);
1021         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1022         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1023                atomic_read(&exp->exp_refcount) - 1);
1024
1025         if (atomic_dec_and_test(&exp->exp_refcount)) {
1026                 struct obd_device *obd = exp->exp_obd;
1027
1028                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1029                        exp, exp->exp_client_uuid.uuid);
1030
1031                 /* release nid stat refererence */
1032                 lprocfs_exp_cleanup(exp);
1033
1034                 if (exp == obd->obd_self_export) {
1035                         /* self export should be destroyed without
1036                          * zombie thread as it doesn't hold a
1037                          * reference to obd and doesn't hold any
1038                          * resources */
1039                         class_export_destroy(exp);
1040                         /* self export is destroyed, no class
1041                          * references exist and it is safe to free
1042                          * obd */
1043                         class_free_dev(obd);
1044                 } else {
1045                         LASSERT(!list_empty(&exp->exp_obd_chain));
1046                         obd_zombie_export_add(exp);
1047                 }
1048
1049         }
1050 }
1051 EXPORT_SYMBOL(class_export_put);
1052
1053 static void obd_zombie_exp_cull(struct work_struct *ws)
1054 {
1055         struct obd_export *export;
1056
1057         export = container_of(ws, struct obd_export, exp_zombie_work);
1058         class_export_destroy(export);
1059 }
1060
1061 /* Creates a new export, adds it to the hash table, and returns a
1062  * pointer to it. The refcount is 2: one for the hash reference, and
1063  * one for the pointer returned by this function. */
1064 struct obd_export *__class_new_export(struct obd_device *obd,
1065                                       struct obd_uuid *cluuid, bool is_self)
1066 {
1067         struct obd_export *export;
1068         struct cfs_hash *hash = NULL;
1069         int rc = 0;
1070         ENTRY;
1071
1072         OBD_ALLOC_PTR(export);
1073         if (!export)
1074                 return ERR_PTR(-ENOMEM);
1075
1076         export->exp_conn_cnt = 0;
1077         export->exp_lock_hash = NULL;
1078         export->exp_flock_hash = NULL;
1079         /* 2 = class_handle_hash + last */
1080         atomic_set(&export->exp_refcount, 2);
1081         atomic_set(&export->exp_rpc_count, 0);
1082         atomic_set(&export->exp_cb_count, 0);
1083         atomic_set(&export->exp_locks_count, 0);
1084 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1085         INIT_LIST_HEAD(&export->exp_locks_list);
1086         spin_lock_init(&export->exp_locks_list_guard);
1087 #endif
1088         atomic_set(&export->exp_replay_count, 0);
1089         export->exp_obd = obd;
1090         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1091         spin_lock_init(&export->exp_uncommitted_replies_lock);
1092         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1093         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1094         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1095         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1096         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1097         class_handle_hash(&export->exp_handle, &export_handle_ops);
1098         export->exp_last_request_time = ktime_get_real_seconds();
1099         spin_lock_init(&export->exp_lock);
1100         spin_lock_init(&export->exp_rpc_lock);
1101         INIT_HLIST_NODE(&export->exp_uuid_hash);
1102         INIT_HLIST_NODE(&export->exp_nid_hash);
1103         INIT_HLIST_NODE(&export->exp_gen_hash);
1104         spin_lock_init(&export->exp_bl_list_lock);
1105         INIT_LIST_HEAD(&export->exp_bl_list);
1106         INIT_LIST_HEAD(&export->exp_stale_list);
1107         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1108
1109         export->exp_sp_peer = LUSTRE_SP_ANY;
1110         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1111         export->exp_client_uuid = *cluuid;
1112         obd_init_export(export);
1113
1114         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1115                 spin_lock(&obd->obd_dev_lock);
1116                 /* shouldn't happen, but might race */
1117                 if (obd->obd_stopping)
1118                         GOTO(exit_unlock, rc = -ENODEV);
1119
1120                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1121                 if (hash == NULL)
1122                         GOTO(exit_unlock, rc = -ENODEV);
1123                 spin_unlock(&obd->obd_dev_lock);
1124
1125                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1126                 if (rc != 0) {
1127                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1128                                       obd->obd_name, cluuid->uuid, rc);
1129                         GOTO(exit_err, rc = -EALREADY);
1130                 }
1131         }
1132
1133         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1134         spin_lock(&obd->obd_dev_lock);
1135         if (obd->obd_stopping) {
1136                 if (hash)
1137                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1138                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1139         }
1140
1141         if (!is_self) {
1142                 class_incref(obd, "export", export);
1143                 list_add_tail(&export->exp_obd_chain_timed,
1144                               &obd->obd_exports_timed);
1145                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1146                 obd->obd_num_exports++;
1147         } else {
1148                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1149                 INIT_LIST_HEAD(&export->exp_obd_chain);
1150         }
1151         spin_unlock(&obd->obd_dev_lock);
1152         if (hash)
1153                 cfs_hash_putref(hash);
1154         RETURN(export);
1155
1156 exit_unlock:
1157         spin_unlock(&obd->obd_dev_lock);
1158 exit_err:
1159         if (hash)
1160                 cfs_hash_putref(hash);
1161         class_handle_unhash(&export->exp_handle);
1162         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1163         obd_destroy_export(export);
1164         OBD_FREE_PTR(export);
1165         return ERR_PTR(rc);
1166 }
1167
1168 struct obd_export *class_new_export(struct obd_device *obd,
1169                                     struct obd_uuid *uuid)
1170 {
1171         return __class_new_export(obd, uuid, false);
1172 }
1173 EXPORT_SYMBOL(class_new_export);
1174
1175 struct obd_export *class_new_export_self(struct obd_device *obd,
1176                                          struct obd_uuid *uuid)
1177 {
1178         return __class_new_export(obd, uuid, true);
1179 }
1180
1181 void class_unlink_export(struct obd_export *exp)
1182 {
1183         class_handle_unhash(&exp->exp_handle);
1184
1185         if (exp->exp_obd->obd_self_export == exp) {
1186                 class_export_put(exp);
1187                 return;
1188         }
1189
1190         spin_lock(&exp->exp_obd->obd_dev_lock);
1191         /* delete an uuid-export hashitem from hashtables */
1192         if (!hlist_unhashed(&exp->exp_uuid_hash))
1193                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1194                              &exp->exp_client_uuid,
1195                              &exp->exp_uuid_hash);
1196
1197 #ifdef HAVE_SERVER_SUPPORT
1198         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1199                 struct tg_export_data   *ted = &exp->exp_target_data;
1200                 struct cfs_hash         *hash;
1201
1202                 /* Because obd_gen_hash will not be released until
1203                  * class_cleanup(), so hash should never be NULL here */
1204                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1205                 LASSERT(hash != NULL);
1206                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1207                              &exp->exp_gen_hash);
1208                 cfs_hash_putref(hash);
1209         }
1210 #endif /* HAVE_SERVER_SUPPORT */
1211
1212         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1213         list_del_init(&exp->exp_obd_chain_timed);
1214         exp->exp_obd->obd_num_exports--;
1215         spin_unlock(&exp->exp_obd->obd_dev_lock);
1216         atomic_inc(&obd_stale_export_num);
1217
1218         /* A reference is kept by obd_stale_exports list */
1219         obd_stale_export_put(exp);
1220 }
1221 EXPORT_SYMBOL(class_unlink_export);
1222
1223 /* Import management functions */
1224 static void obd_zombie_import_free(struct obd_import *imp)
1225 {
1226         ENTRY;
1227
1228         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1229                 imp->imp_obd->obd_name);
1230
1231         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1232
1233         ptlrpc_put_connection_superhack(imp->imp_connection);
1234
1235         while (!list_empty(&imp->imp_conn_list)) {
1236                 struct obd_import_conn *imp_conn;
1237
1238                 imp_conn = list_entry(imp->imp_conn_list.next,
1239                                       struct obd_import_conn, oic_item);
1240                 list_del_init(&imp_conn->oic_item);
1241                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1242                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1243         }
1244
1245         LASSERT(imp->imp_sec == NULL);
1246         class_decref(imp->imp_obd, "import", imp);
1247         OBD_FREE_PTR(imp);
1248         EXIT;
1249 }
1250
1251 struct obd_import *class_import_get(struct obd_import *import)
1252 {
1253         atomic_inc(&import->imp_refcount);
1254         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1255                atomic_read(&import->imp_refcount),
1256                import->imp_obd->obd_name);
1257         return import;
1258 }
1259 EXPORT_SYMBOL(class_import_get);
1260
1261 void class_import_put(struct obd_import *imp)
1262 {
1263         ENTRY;
1264
1265         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1266
1267         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1268                atomic_read(&imp->imp_refcount) - 1,
1269                imp->imp_obd->obd_name);
1270
1271         if (atomic_dec_and_test(&imp->imp_refcount)) {
1272                 CDEBUG(D_INFO, "final put import %p\n", imp);
1273                 obd_zombie_import_add(imp);
1274         }
1275
1276         /* catch possible import put race */
1277         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1278         EXIT;
1279 }
1280 EXPORT_SYMBOL(class_import_put);
1281
1282 static void init_imp_at(struct imp_at *at) {
1283         int i;
1284         at_init(&at->iat_net_latency, 0, 0);
1285         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1286                 /* max service estimates are tracked on the server side, so
1287                    don't use the AT history here, just use the last reported
1288                    val. (But keep hist for proc histogram, worst_ever) */
1289                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1290                         AT_FLG_NOHIST);
1291         }
1292 }
1293
1294 static void obd_zombie_imp_cull(struct work_struct *ws)
1295 {
1296         struct obd_import *import;
1297
1298         import = container_of(ws, struct obd_import, imp_zombie_work);
1299         obd_zombie_import_free(import);
1300 }
1301
1302 struct obd_import *class_new_import(struct obd_device *obd)
1303 {
1304         struct obd_import *imp;
1305         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1306
1307         OBD_ALLOC(imp, sizeof(*imp));
1308         if (imp == NULL)
1309                 return NULL;
1310
1311         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1312         INIT_LIST_HEAD(&imp->imp_replay_list);
1313         INIT_LIST_HEAD(&imp->imp_sending_list);
1314         INIT_LIST_HEAD(&imp->imp_delayed_list);
1315         INIT_LIST_HEAD(&imp->imp_committed_list);
1316         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1317         imp->imp_known_replied_xid = 0;
1318         imp->imp_replay_cursor = &imp->imp_committed_list;
1319         spin_lock_init(&imp->imp_lock);
1320         imp->imp_last_success_conn = 0;
1321         imp->imp_state = LUSTRE_IMP_NEW;
1322         imp->imp_obd = class_incref(obd, "import", imp);
1323         rwlock_init(&imp->imp_sec_lock);
1324         init_waitqueue_head(&imp->imp_recovery_waitq);
1325         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1326
1327         if (curr_pid_ns->child_reaper)
1328                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1329         else
1330                 imp->imp_sec_refpid = 1;
1331
1332         atomic_set(&imp->imp_refcount, 2);
1333         atomic_set(&imp->imp_unregistering, 0);
1334         atomic_set(&imp->imp_inflight, 0);
1335         atomic_set(&imp->imp_replay_inflight, 0);
1336         atomic_set(&imp->imp_inval_count, 0);
1337         INIT_LIST_HEAD(&imp->imp_conn_list);
1338         init_imp_at(&imp->imp_at);
1339
1340         /* the default magic is V2, will be used in connect RPC, and
1341          * then adjusted according to the flags in request/reply. */
1342         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1343
1344         return imp;
1345 }
1346 EXPORT_SYMBOL(class_new_import);
1347
1348 void class_destroy_import(struct obd_import *import)
1349 {
1350         LASSERT(import != NULL);
1351         LASSERT(import != LP_POISON);
1352
1353         spin_lock(&import->imp_lock);
1354         import->imp_generation++;
1355         spin_unlock(&import->imp_lock);
1356         class_import_put(import);
1357 }
1358 EXPORT_SYMBOL(class_destroy_import);
1359
1360 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1361
1362 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1363 {
1364         spin_lock(&exp->exp_locks_list_guard);
1365
1366         LASSERT(lock->l_exp_refs_nr >= 0);
1367
1368         if (lock->l_exp_refs_target != NULL &&
1369             lock->l_exp_refs_target != exp) {
1370                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1371                               exp, lock, lock->l_exp_refs_target);
1372         }
1373         if ((lock->l_exp_refs_nr ++) == 0) {
1374                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1375                 lock->l_exp_refs_target = exp;
1376         }
1377         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1378                lock, exp, lock->l_exp_refs_nr);
1379         spin_unlock(&exp->exp_locks_list_guard);
1380 }
1381 EXPORT_SYMBOL(__class_export_add_lock_ref);
1382
1383 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1384 {
1385         spin_lock(&exp->exp_locks_list_guard);
1386         LASSERT(lock->l_exp_refs_nr > 0);
1387         if (lock->l_exp_refs_target != exp) {
1388                 LCONSOLE_WARN("lock %p, "
1389                               "mismatching export pointers: %p, %p\n",
1390                               lock, lock->l_exp_refs_target, exp);
1391         }
1392         if (-- lock->l_exp_refs_nr == 0) {
1393                 list_del_init(&lock->l_exp_refs_link);
1394                 lock->l_exp_refs_target = NULL;
1395         }
1396         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1397                lock, exp, lock->l_exp_refs_nr);
1398         spin_unlock(&exp->exp_locks_list_guard);
1399 }
1400 EXPORT_SYMBOL(__class_export_del_lock_ref);
1401 #endif
1402
1403 /* A connection defines an export context in which preallocation can
1404    be managed. This releases the export pointer reference, and returns
1405    the export handle, so the export refcount is 1 when this function
1406    returns. */
1407 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1408                   struct obd_uuid *cluuid)
1409 {
1410         struct obd_export *export;
1411         LASSERT(conn != NULL);
1412         LASSERT(obd != NULL);
1413         LASSERT(cluuid != NULL);
1414         ENTRY;
1415
1416         export = class_new_export(obd, cluuid);
1417         if (IS_ERR(export))
1418                 RETURN(PTR_ERR(export));
1419
1420         conn->cookie = export->exp_handle.h_cookie;
1421         class_export_put(export);
1422
1423         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1424                cluuid->uuid, conn->cookie);
1425         RETURN(0);
1426 }
1427 EXPORT_SYMBOL(class_connect);
1428
1429 /* if export is involved in recovery then clean up related things */
1430 static void class_export_recovery_cleanup(struct obd_export *exp)
1431 {
1432         struct obd_device *obd = exp->exp_obd;
1433
1434         spin_lock(&obd->obd_recovery_task_lock);
1435         if (obd->obd_recovering) {
1436                 if (exp->exp_in_recovery) {
1437                         spin_lock(&exp->exp_lock);
1438                         exp->exp_in_recovery = 0;
1439                         spin_unlock(&exp->exp_lock);
1440                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1441                         atomic_dec(&obd->obd_connected_clients);
1442                 }
1443
1444                 /* if called during recovery then should update
1445                  * obd_stale_clients counter,
1446                  * lightweight exports are not counted */
1447                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1448                         exp->exp_obd->obd_stale_clients++;
1449         }
1450         spin_unlock(&obd->obd_recovery_task_lock);
1451
1452         spin_lock(&exp->exp_lock);
1453         /** Cleanup req replay fields */
1454         if (exp->exp_req_replay_needed) {
1455                 exp->exp_req_replay_needed = 0;
1456
1457                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1458                 atomic_dec(&obd->obd_req_replay_clients);
1459         }
1460
1461         /** Cleanup lock replay data */
1462         if (exp->exp_lock_replay_needed) {
1463                 exp->exp_lock_replay_needed = 0;
1464
1465                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1466                 atomic_dec(&obd->obd_lock_replay_clients);
1467         }
1468         spin_unlock(&exp->exp_lock);
1469 }
1470
1471 /* This function removes 1-3 references from the export:
1472  * 1 - for export pointer passed
1473  * and if disconnect really need
1474  * 2 - removing from hash
1475  * 3 - in client_unlink_export
1476  * The export pointer passed to this function can destroyed */
1477 int class_disconnect(struct obd_export *export)
1478 {
1479         int already_disconnected;
1480         ENTRY;
1481
1482         if (export == NULL) {
1483                 CWARN("attempting to free NULL export %p\n", export);
1484                 RETURN(-EINVAL);
1485         }
1486
1487         spin_lock(&export->exp_lock);
1488         already_disconnected = export->exp_disconnected;
1489         export->exp_disconnected = 1;
1490         /*  We hold references of export for uuid hash
1491          *  and nid_hash and export link at least. So
1492          *  it is safe to call cfs_hash_del in there.  */
1493         if (!hlist_unhashed(&export->exp_nid_hash))
1494                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1495                              &export->exp_connection->c_peer.nid,
1496                              &export->exp_nid_hash);
1497         spin_unlock(&export->exp_lock);
1498
1499         /* class_cleanup(), abort_recovery(), and class_fail_export()
1500          * all end up in here, and if any of them race we shouldn't
1501          * call extra class_export_puts(). */
1502         if (already_disconnected) {
1503                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1504                 GOTO(no_disconn, already_disconnected);
1505         }
1506
1507         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1508                export->exp_handle.h_cookie);
1509
1510         class_export_recovery_cleanup(export);
1511         class_unlink_export(export);
1512 no_disconn:
1513         class_export_put(export);
1514         RETURN(0);
1515 }
1516 EXPORT_SYMBOL(class_disconnect);
1517
1518 /* Return non-zero for a fully connected export */
1519 int class_connected_export(struct obd_export *exp)
1520 {
1521         int connected = 0;
1522
1523         if (exp) {
1524                 spin_lock(&exp->exp_lock);
1525                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1526                 spin_unlock(&exp->exp_lock);
1527         }
1528         return connected;
1529 }
1530 EXPORT_SYMBOL(class_connected_export);
1531
1532 static void class_disconnect_export_list(struct list_head *list,
1533                                          enum obd_option flags)
1534 {
1535         int rc;
1536         struct obd_export *exp;
1537         ENTRY;
1538
1539         /* It's possible that an export may disconnect itself, but
1540          * nothing else will be added to this list. */
1541         while (!list_empty(list)) {
1542                 exp = list_entry(list->next, struct obd_export,
1543                                  exp_obd_chain);
1544                 /* need for safe call CDEBUG after obd_disconnect */
1545                 class_export_get(exp);
1546
1547                 spin_lock(&exp->exp_lock);
1548                 exp->exp_flags = flags;
1549                 spin_unlock(&exp->exp_lock);
1550
1551                 if (obd_uuid_equals(&exp->exp_client_uuid,
1552                                     &exp->exp_obd->obd_uuid)) {
1553                         CDEBUG(D_HA,
1554                                "exp %p export uuid == obd uuid, don't discon\n",
1555                                exp);
1556                         /* Need to delete this now so we don't end up pointing
1557                          * to work_list later when this export is cleaned up. */
1558                         list_del_init(&exp->exp_obd_chain);
1559                         class_export_put(exp);
1560                         continue;
1561                 }
1562
1563                 class_export_get(exp);
1564                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1565                        "last request at %lld\n",
1566                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1567                        exp, exp->exp_last_request_time);
1568                 /* release one export reference anyway */
1569                 rc = obd_disconnect(exp);
1570
1571                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1572                        obd_export_nid2str(exp), exp, rc);
1573                 class_export_put(exp);
1574         }
1575         EXIT;
1576 }
1577
1578 void class_disconnect_exports(struct obd_device *obd)
1579 {
1580         struct list_head work_list;
1581         ENTRY;
1582
1583         /* Move all of the exports from obd_exports to a work list, en masse. */
1584         INIT_LIST_HEAD(&work_list);
1585         spin_lock(&obd->obd_dev_lock);
1586         list_splice_init(&obd->obd_exports, &work_list);
1587         list_splice_init(&obd->obd_delayed_exports, &work_list);
1588         spin_unlock(&obd->obd_dev_lock);
1589
1590         if (!list_empty(&work_list)) {
1591                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1592                        "disconnecting them\n", obd->obd_minor, obd);
1593                 class_disconnect_export_list(&work_list,
1594                                              exp_flags_from_obd(obd));
1595         } else
1596                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1597                        obd->obd_minor, obd);
1598         EXIT;
1599 }
1600 EXPORT_SYMBOL(class_disconnect_exports);
1601
1602 /* Remove exports that have not completed recovery.
1603  */
1604 void class_disconnect_stale_exports(struct obd_device *obd,
1605                                     int (*test_export)(struct obd_export *))
1606 {
1607         struct list_head work_list;
1608         struct obd_export *exp, *n;
1609         int evicted = 0;
1610         ENTRY;
1611
1612         INIT_LIST_HEAD(&work_list);
1613         spin_lock(&obd->obd_dev_lock);
1614         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1615                                  exp_obd_chain) {
1616                 /* don't count self-export as client */
1617                 if (obd_uuid_equals(&exp->exp_client_uuid,
1618                                     &exp->exp_obd->obd_uuid))
1619                         continue;
1620
1621                 /* don't evict clients which have no slot in last_rcvd
1622                  * (e.g. lightweight connection) */
1623                 if (exp->exp_target_data.ted_lr_idx == -1)
1624                         continue;
1625
1626                 spin_lock(&exp->exp_lock);
1627                 if (exp->exp_failed || test_export(exp)) {
1628                         spin_unlock(&exp->exp_lock);
1629                         continue;
1630                 }
1631                 exp->exp_failed = 1;
1632                 spin_unlock(&exp->exp_lock);
1633
1634                 list_move(&exp->exp_obd_chain, &work_list);
1635                 evicted++;
1636                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1637                        obd->obd_name, exp->exp_client_uuid.uuid,
1638                        obd_export_nid2str(exp));
1639                 print_export_data(exp, "EVICTING", 0, D_HA);
1640         }
1641         spin_unlock(&obd->obd_dev_lock);
1642
1643         if (evicted)
1644                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1645                               obd->obd_name, evicted);
1646
1647         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1648                                                  OBD_OPT_ABORT_RECOV);
1649         EXIT;
1650 }
1651 EXPORT_SYMBOL(class_disconnect_stale_exports);
1652
1653 void class_fail_export(struct obd_export *exp)
1654 {
1655         int rc, already_failed;
1656
1657         spin_lock(&exp->exp_lock);
1658         already_failed = exp->exp_failed;
1659         exp->exp_failed = 1;
1660         spin_unlock(&exp->exp_lock);
1661
1662         if (already_failed) {
1663                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1664                        exp, exp->exp_client_uuid.uuid);
1665                 return;
1666         }
1667
1668         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1669                exp, exp->exp_client_uuid.uuid);
1670
1671         if (obd_dump_on_timeout)
1672                 libcfs_debug_dumplog();
1673
1674         /* need for safe call CDEBUG after obd_disconnect */
1675         class_export_get(exp);
1676
1677         /* Most callers into obd_disconnect are removing their own reference
1678          * (request, for example) in addition to the one from the hash table.
1679          * We don't have such a reference here, so make one. */
1680         class_export_get(exp);
1681         rc = obd_disconnect(exp);
1682         if (rc)
1683                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1684         else
1685                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1686                        exp, exp->exp_client_uuid.uuid);
1687         class_export_put(exp);
1688 }
1689 EXPORT_SYMBOL(class_fail_export);
1690
1691 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1692 {
1693         struct cfs_hash *nid_hash;
1694         struct obd_export *doomed_exp = NULL;
1695         int exports_evicted = 0;
1696
1697         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1698
1699         spin_lock(&obd->obd_dev_lock);
1700         /* umount has run already, so evict thread should leave
1701          * its task to umount thread now */
1702         if (obd->obd_stopping) {
1703                 spin_unlock(&obd->obd_dev_lock);
1704                 return exports_evicted;
1705         }
1706         nid_hash = obd->obd_nid_hash;
1707         cfs_hash_getref(nid_hash);
1708         spin_unlock(&obd->obd_dev_lock);
1709
1710         do {
1711                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1712                 if (doomed_exp == NULL)
1713                         break;
1714
1715                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1716                          "nid %s found, wanted nid %s, requested nid %s\n",
1717                          obd_export_nid2str(doomed_exp),
1718                          libcfs_nid2str(nid_key), nid);
1719                 LASSERTF(doomed_exp != obd->obd_self_export,
1720                          "self-export is hashed by NID?\n");
1721                 exports_evicted++;
1722                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1723                               "request\n", obd->obd_name,
1724                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1725                               obd_export_nid2str(doomed_exp));
1726                 class_fail_export(doomed_exp);
1727                 class_export_put(doomed_exp);
1728         } while (1);
1729
1730         cfs_hash_putref(nid_hash);
1731
1732         if (!exports_evicted)
1733                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1734                        obd->obd_name, nid);
1735         return exports_evicted;
1736 }
1737 EXPORT_SYMBOL(obd_export_evict_by_nid);
1738
1739 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1740 {
1741         struct cfs_hash *uuid_hash;
1742         struct obd_export *doomed_exp = NULL;
1743         struct obd_uuid doomed_uuid;
1744         int exports_evicted = 0;
1745
1746         spin_lock(&obd->obd_dev_lock);
1747         if (obd->obd_stopping) {
1748                 spin_unlock(&obd->obd_dev_lock);
1749                 return exports_evicted;
1750         }
1751         uuid_hash = obd->obd_uuid_hash;
1752         cfs_hash_getref(uuid_hash);
1753         spin_unlock(&obd->obd_dev_lock);
1754
1755         obd_str2uuid(&doomed_uuid, uuid);
1756         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1757                 CERROR("%s: can't evict myself\n", obd->obd_name);
1758                 cfs_hash_putref(uuid_hash);
1759                 return exports_evicted;
1760         }
1761
1762         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1763
1764         if (doomed_exp == NULL) {
1765                 CERROR("%s: can't disconnect %s: no exports found\n",
1766                        obd->obd_name, uuid);
1767         } else {
1768                 CWARN("%s: evicting %s at adminstrative request\n",
1769                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1770                 class_fail_export(doomed_exp);
1771                 class_export_put(doomed_exp);
1772                 exports_evicted++;
1773         }
1774         cfs_hash_putref(uuid_hash);
1775
1776         return exports_evicted;
1777 }
1778
1779 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1780 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1781 EXPORT_SYMBOL(class_export_dump_hook);
1782 #endif
1783
1784 static void print_export_data(struct obd_export *exp, const char *status,
1785                               int locks, int debug_level)
1786 {
1787         struct ptlrpc_reply_state *rs;
1788         struct ptlrpc_reply_state *first_reply = NULL;
1789         int nreplies = 0;
1790
1791         spin_lock(&exp->exp_lock);
1792         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1793                             rs_exp_list) {
1794                 if (nreplies == 0)
1795                         first_reply = rs;
1796                 nreplies++;
1797         }
1798         spin_unlock(&exp->exp_lock);
1799
1800         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1801                "%p %s %llu stale:%d\n",
1802                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1803                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1804                atomic_read(&exp->exp_rpc_count),
1805                atomic_read(&exp->exp_cb_count),
1806                atomic_read(&exp->exp_locks_count),
1807                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1808                nreplies, first_reply, nreplies > 3 ? "..." : "",
1809                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1810 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1811         if (locks && class_export_dump_hook != NULL)
1812                 class_export_dump_hook(exp);
1813 #endif
1814 }
1815
1816 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1817 {
1818         struct obd_export *exp;
1819
1820         spin_lock(&obd->obd_dev_lock);
1821         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1822                 print_export_data(exp, "ACTIVE", locks, debug_level);
1823         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1824                 print_export_data(exp, "UNLINKED", locks, debug_level);
1825         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1826                 print_export_data(exp, "DELAYED", locks, debug_level);
1827         spin_unlock(&obd->obd_dev_lock);
1828 }
1829
1830 void obd_exports_barrier(struct obd_device *obd)
1831 {
1832         int waited = 2;
1833         LASSERT(list_empty(&obd->obd_exports));
1834         spin_lock(&obd->obd_dev_lock);
1835         while (!list_empty(&obd->obd_unlinked_exports)) {
1836                 spin_unlock(&obd->obd_dev_lock);
1837                 set_current_state(TASK_UNINTERRUPTIBLE);
1838                 schedule_timeout(cfs_time_seconds(waited));
1839                 if (waited > 5 && is_power_of_2(waited)) {
1840                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1841                                       "more than %d seconds. "
1842                                       "The obd refcount = %d. Is it stuck?\n",
1843                                       obd->obd_name, waited,
1844                                       atomic_read(&obd->obd_refcount));
1845                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1846                 }
1847                 waited *= 2;
1848                 spin_lock(&obd->obd_dev_lock);
1849         }
1850         spin_unlock(&obd->obd_dev_lock);
1851 }
1852 EXPORT_SYMBOL(obd_exports_barrier);
1853
1854 /**
1855  * Add export to the obd_zombe thread and notify it.
1856  */
1857 static void obd_zombie_export_add(struct obd_export *exp) {
1858         atomic_dec(&obd_stale_export_num);
1859         spin_lock(&exp->exp_obd->obd_dev_lock);
1860         LASSERT(!list_empty(&exp->exp_obd_chain));
1861         list_del_init(&exp->exp_obd_chain);
1862         spin_unlock(&exp->exp_obd->obd_dev_lock);
1863
1864         queue_work(zombie_wq, &exp->exp_zombie_work);
1865 }
1866
1867 /**
1868  * Add import to the obd_zombe thread and notify it.
1869  */
1870 static void obd_zombie_import_add(struct obd_import *imp) {
1871         LASSERT(imp->imp_sec == NULL);
1872
1873         queue_work(zombie_wq, &imp->imp_zombie_work);
1874 }
1875
1876 /**
1877  * wait when obd_zombie import/export queues become empty
1878  */
1879 void obd_zombie_barrier(void)
1880 {
1881         flush_workqueue(zombie_wq);
1882 }
1883 EXPORT_SYMBOL(obd_zombie_barrier);
1884
1885
1886 struct obd_export *obd_stale_export_get(void)
1887 {
1888         struct obd_export *exp = NULL;
1889         ENTRY;
1890
1891         spin_lock(&obd_stale_export_lock);
1892         if (!list_empty(&obd_stale_exports)) {
1893                 exp = list_entry(obd_stale_exports.next,
1894                                  struct obd_export, exp_stale_list);
1895                 list_del_init(&exp->exp_stale_list);
1896         }
1897         spin_unlock(&obd_stale_export_lock);
1898
1899         if (exp) {
1900                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1901                        atomic_read(&obd_stale_export_num));
1902         }
1903         RETURN(exp);
1904 }
1905 EXPORT_SYMBOL(obd_stale_export_get);
1906
1907 void obd_stale_export_put(struct obd_export *exp)
1908 {
1909         ENTRY;
1910
1911         LASSERT(list_empty(&exp->exp_stale_list));
1912         if (exp->exp_lock_hash &&
1913             atomic_read(&exp->exp_lock_hash->hs_count)) {
1914                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1915                        atomic_read(&obd_stale_export_num));
1916
1917                 spin_lock_bh(&exp->exp_bl_list_lock);
1918                 spin_lock(&obd_stale_export_lock);
1919                 /* Add to the tail if there is no blocked locks,
1920                  * to the head otherwise. */
1921                 if (list_empty(&exp->exp_bl_list))
1922                         list_add_tail(&exp->exp_stale_list,
1923                                       &obd_stale_exports);
1924                 else
1925                         list_add(&exp->exp_stale_list,
1926                                  &obd_stale_exports);
1927
1928                 spin_unlock(&obd_stale_export_lock);
1929                 spin_unlock_bh(&exp->exp_bl_list_lock);
1930         } else {
1931                 class_export_put(exp);
1932         }
1933         EXIT;
1934 }
1935 EXPORT_SYMBOL(obd_stale_export_put);
1936
1937 /**
1938  * Adjust the position of the export in the stale list,
1939  * i.e. move to the head of the list if is needed.
1940  **/
1941 void obd_stale_export_adjust(struct obd_export *exp)
1942 {
1943         LASSERT(exp != NULL);
1944         spin_lock_bh(&exp->exp_bl_list_lock);
1945         spin_lock(&obd_stale_export_lock);
1946
1947         if (!list_empty(&exp->exp_stale_list) &&
1948             !list_empty(&exp->exp_bl_list))
1949                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1950
1951         spin_unlock(&obd_stale_export_lock);
1952         spin_unlock_bh(&exp->exp_bl_list_lock);
1953 }
1954 EXPORT_SYMBOL(obd_stale_export_adjust);
1955
1956 /**
1957  * start destroy zombie import/export thread
1958  */
1959 int obd_zombie_impexp_init(void)
1960 {
1961         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1962         if (!zombie_wq)
1963                 return -ENOMEM;
1964
1965         return 0;
1966 }
1967
1968 /**
1969  * stop destroy zombie import/export thread
1970  */
1971 void obd_zombie_impexp_stop(void)
1972 {
1973         destroy_workqueue(zombie_wq);
1974         LASSERT(list_empty(&obd_stale_exports));
1975 }
1976
1977 /***** Kernel-userspace comm helpers *******/
1978
1979 /* Get length of entire message, including header */
1980 int kuc_len(int payload_len)
1981 {
1982         return sizeof(struct kuc_hdr) + payload_len;
1983 }
1984 EXPORT_SYMBOL(kuc_len);
1985
1986 /* Get a pointer to kuc header, given a ptr to the payload
1987  * @param p Pointer to payload area
1988  * @returns Pointer to kuc header
1989  */
1990 struct kuc_hdr * kuc_ptr(void *p)
1991 {
1992         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1993         LASSERT(lh->kuc_magic == KUC_MAGIC);
1994         return lh;
1995 }
1996 EXPORT_SYMBOL(kuc_ptr);
1997
1998 /* Alloc space for a message, and fill in header
1999  * @return Pointer to payload area
2000  */
2001 void *kuc_alloc(int payload_len, int transport, int type)
2002 {
2003         struct kuc_hdr *lh;
2004         int len = kuc_len(payload_len);
2005
2006         OBD_ALLOC(lh, len);
2007         if (lh == NULL)
2008                 return ERR_PTR(-ENOMEM);
2009
2010         lh->kuc_magic = KUC_MAGIC;
2011         lh->kuc_transport = transport;
2012         lh->kuc_msgtype = type;
2013         lh->kuc_msglen = len;
2014
2015         return (void *)(lh + 1);
2016 }
2017 EXPORT_SYMBOL(kuc_alloc);
2018
2019 /* Takes pointer to payload area */
2020 void kuc_free(void *p, int payload_len)
2021 {
2022         struct kuc_hdr *lh = kuc_ptr(p);
2023         OBD_FREE(lh, kuc_len(payload_len));
2024 }
2025 EXPORT_SYMBOL(kuc_free);
2026
2027 struct obd_request_slot_waiter {
2028         struct list_head        orsw_entry;
2029         wait_queue_head_t       orsw_waitq;
2030         bool                    orsw_signaled;
2031 };
2032
2033 static bool obd_request_slot_avail(struct client_obd *cli,
2034                                    struct obd_request_slot_waiter *orsw)
2035 {
2036         bool avail;
2037
2038         spin_lock(&cli->cl_loi_list_lock);
2039         avail = !!list_empty(&orsw->orsw_entry);
2040         spin_unlock(&cli->cl_loi_list_lock);
2041
2042         return avail;
2043 };
2044
2045 /*
2046  * For network flow control, the RPC sponsor needs to acquire a credit
2047  * before sending the RPC. The credits count for a connection is defined
2048  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2049  * the subsequent RPC sponsors need to wait until others released their
2050  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2051  */
2052 int obd_get_request_slot(struct client_obd *cli)
2053 {
2054         struct obd_request_slot_waiter   orsw;
2055         struct l_wait_info               lwi;
2056         int                              rc;
2057
2058         spin_lock(&cli->cl_loi_list_lock);
2059         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2060                 cli->cl_rpcs_in_flight++;
2061                 spin_unlock(&cli->cl_loi_list_lock);
2062                 return 0;
2063         }
2064
2065         init_waitqueue_head(&orsw.orsw_waitq);
2066         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2067         orsw.orsw_signaled = false;
2068         spin_unlock(&cli->cl_loi_list_lock);
2069
2070         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2071         rc = l_wait_event(orsw.orsw_waitq,
2072                           obd_request_slot_avail(cli, &orsw) ||
2073                           orsw.orsw_signaled,
2074                           &lwi);
2075
2076         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2077          * freed but other (such as obd_put_request_slot) is using it. */
2078         spin_lock(&cli->cl_loi_list_lock);
2079         if (rc != 0) {
2080                 if (!orsw.orsw_signaled) {
2081                         if (list_empty(&orsw.orsw_entry))
2082                                 cli->cl_rpcs_in_flight--;
2083                         else
2084                                 list_del(&orsw.orsw_entry);
2085                 }
2086         }
2087
2088         if (orsw.orsw_signaled) {
2089                 LASSERT(list_empty(&orsw.orsw_entry));
2090
2091                 rc = -EINTR;
2092         }
2093         spin_unlock(&cli->cl_loi_list_lock);
2094
2095         return rc;
2096 }
2097 EXPORT_SYMBOL(obd_get_request_slot);
2098
2099 void obd_put_request_slot(struct client_obd *cli)
2100 {
2101         struct obd_request_slot_waiter *orsw;
2102
2103         spin_lock(&cli->cl_loi_list_lock);
2104         cli->cl_rpcs_in_flight--;
2105
2106         /* If there is free slot, wakeup the first waiter. */
2107         if (!list_empty(&cli->cl_flight_waiters) &&
2108             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2109                 orsw = list_entry(cli->cl_flight_waiters.next,
2110                                   struct obd_request_slot_waiter, orsw_entry);
2111                 list_del_init(&orsw->orsw_entry);
2112                 cli->cl_rpcs_in_flight++;
2113                 wake_up(&orsw->orsw_waitq);
2114         }
2115         spin_unlock(&cli->cl_loi_list_lock);
2116 }
2117 EXPORT_SYMBOL(obd_put_request_slot);
2118
2119 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2120 {
2121         return cli->cl_max_rpcs_in_flight;
2122 }
2123 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2124
2125 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2126 {
2127         struct obd_request_slot_waiter *orsw;
2128         __u32                           old;
2129         int                             diff;
2130         int                             i;
2131         const char *type_name;
2132         int                             rc;
2133
2134         if (max > OBD_MAX_RIF_MAX || max < 1)
2135                 return -ERANGE;
2136
2137         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2138         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2139                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2140                  * strictly lower that max_rpcs_in_flight */
2141                 if (max < 2) {
2142                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2143                                "because it must be higher than "
2144                                "max_mod_rpcs_in_flight value",
2145                                cli->cl_import->imp_obd->obd_name);
2146                         return -ERANGE;
2147                 }
2148                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2149                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2150                         if (rc != 0)
2151                                 return rc;
2152                 }
2153         }
2154
2155         spin_lock(&cli->cl_loi_list_lock);
2156         old = cli->cl_max_rpcs_in_flight;
2157         cli->cl_max_rpcs_in_flight = max;
2158         client_adjust_max_dirty(cli);
2159
2160         diff = max - old;
2161
2162         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2163         for (i = 0; i < diff; i++) {
2164                 if (list_empty(&cli->cl_flight_waiters))
2165                         break;
2166
2167                 orsw = list_entry(cli->cl_flight_waiters.next,
2168                                   struct obd_request_slot_waiter, orsw_entry);
2169                 list_del_init(&orsw->orsw_entry);
2170                 cli->cl_rpcs_in_flight++;
2171                 wake_up(&orsw->orsw_waitq);
2172         }
2173         spin_unlock(&cli->cl_loi_list_lock);
2174
2175         return 0;
2176 }
2177 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2178
2179 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2180 {
2181         return cli->cl_max_mod_rpcs_in_flight;
2182 }
2183 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2184
2185 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2186 {
2187         struct obd_connect_data *ocd;
2188         __u16 maxmodrpcs;
2189         __u16 prev;
2190
2191         if (max > OBD_MAX_RIF_MAX || max < 1)
2192                 return -ERANGE;
2193
2194         /* cannot exceed or equal max_rpcs_in_flight */
2195         if (max >= cli->cl_max_rpcs_in_flight) {
2196                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2197                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2198                        cli->cl_import->imp_obd->obd_name,
2199                        max, cli->cl_max_rpcs_in_flight);
2200                 return -ERANGE;
2201         }
2202
2203         /* cannot exceed max modify RPCs in flight supported by the server */
2204         ocd = &cli->cl_import->imp_connect_data;
2205         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2206                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2207         else
2208                 maxmodrpcs = 1;
2209         if (max > maxmodrpcs) {
2210                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2211                        "higher than max_mod_rpcs_per_client value (%hu) "
2212                        "returned by the server at connection\n",
2213                        cli->cl_import->imp_obd->obd_name,
2214                        max, maxmodrpcs);
2215                 return -ERANGE;
2216         }
2217
2218         spin_lock(&cli->cl_mod_rpcs_lock);
2219
2220         prev = cli->cl_max_mod_rpcs_in_flight;
2221         cli->cl_max_mod_rpcs_in_flight = max;
2222
2223         /* wakeup waiters if limit has been increased */
2224         if (cli->cl_max_mod_rpcs_in_flight > prev)
2225                 wake_up(&cli->cl_mod_rpcs_waitq);
2226
2227         spin_unlock(&cli->cl_mod_rpcs_lock);
2228
2229         return 0;
2230 }
2231 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2232
2233 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2234                                struct seq_file *seq)
2235 {
2236         unsigned long mod_tot = 0, mod_cum;
2237         struct timespec64 now;
2238         int i;
2239
2240         ktime_get_real_ts64(&now);
2241
2242         spin_lock(&cli->cl_mod_rpcs_lock);
2243
2244         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2245                    (s64)now.tv_sec, now.tv_nsec);
2246         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2247                    cli->cl_mod_rpcs_in_flight);
2248
2249         seq_printf(seq, "\n\t\t\tmodify\n");
2250         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2251
2252         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2253
2254         mod_cum = 0;
2255         for (i = 0; i < OBD_HIST_MAX; i++) {
2256                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2257                 mod_cum += mod;
2258                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2259                            i, mod, pct(mod, mod_tot),
2260                            pct(mod_cum, mod_tot));
2261                 if (mod_cum == mod_tot)
2262                         break;
2263         }
2264
2265         spin_unlock(&cli->cl_mod_rpcs_lock);
2266
2267         return 0;
2268 }
2269 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2270
2271 /* The number of modify RPCs sent in parallel is limited
2272  * because the server has a finite number of slots per client to
2273  * store request result and ensure reply reconstruction when needed.
2274  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2275  * that takes into account server limit and cl_max_rpcs_in_flight
2276  * value.
2277  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2278  * one close request is allowed above the maximum.
2279  */
2280 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2281                                                  bool close_req)
2282 {
2283         bool avail;
2284
2285         /* A slot is available if
2286          * - number of modify RPCs in flight is less than the max
2287          * - it's a close RPC and no other close request is in flight
2288          */
2289         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2290                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2291
2292         return avail;
2293 }
2294
2295 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2296                                          bool close_req)
2297 {
2298         bool avail;
2299
2300         spin_lock(&cli->cl_mod_rpcs_lock);
2301         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2302         spin_unlock(&cli->cl_mod_rpcs_lock);
2303         return avail;
2304 }
2305
2306 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2307 {
2308         if (it != NULL &&
2309             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2310              it->it_op == IT_READDIR ||
2311              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2312                         return true;
2313         return false;
2314 }
2315
2316 /* Get a modify RPC slot from the obd client @cli according
2317  * to the kind of operation @opc that is going to be sent
2318  * and the intent @it of the operation if it applies.
2319  * If the maximum number of modify RPCs in flight is reached
2320  * the thread is put to sleep.
2321  * Returns the tag to be set in the request message. Tag 0
2322  * is reserved for non-modifying requests.
2323  */
2324 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2325                            struct lookup_intent *it)
2326 {
2327         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2328         bool                    close_req = false;
2329         __u16                   i, max;
2330
2331         /* read-only metadata RPCs don't consume a slot on MDT
2332          * for reply reconstruction
2333          */
2334         if (obd_skip_mod_rpc_slot(it))
2335                 return 0;
2336
2337         if (opc == MDS_CLOSE)
2338                 close_req = true;
2339
2340         do {
2341                 spin_lock(&cli->cl_mod_rpcs_lock);
2342                 max = cli->cl_max_mod_rpcs_in_flight;
2343                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2344                         /* there is a slot available */
2345                         cli->cl_mod_rpcs_in_flight++;
2346                         if (close_req)
2347                                 cli->cl_close_rpcs_in_flight++;
2348                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2349                                          cli->cl_mod_rpcs_in_flight);
2350                         /* find a free tag */
2351                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2352                                                 max + 1);
2353                         LASSERT(i < OBD_MAX_RIF_MAX);
2354                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2355                         spin_unlock(&cli->cl_mod_rpcs_lock);
2356                         /* tag 0 is reserved for non-modify RPCs */
2357                         return i + 1;
2358                 }
2359                 spin_unlock(&cli->cl_mod_rpcs_lock);
2360
2361                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2362                        "opc %u, max %hu\n",
2363                        cli->cl_import->imp_obd->obd_name, opc, max);
2364
2365                 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2366                                        obd_mod_rpc_slot_avail(cli, close_req),
2367                                        &lwi);
2368         } while (true);
2369 }
2370 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2371
2372 /* Put a modify RPC slot from the obd client @cli according
2373  * to the kind of operation @opc that has been sent and the
2374  * intent @it of the operation if it applies.
2375  */
2376 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2377                           struct lookup_intent *it, __u16 tag)
2378 {
2379         bool                    close_req = false;
2380
2381         if (obd_skip_mod_rpc_slot(it))
2382                 return;
2383
2384         if (opc == MDS_CLOSE)
2385                 close_req = true;
2386
2387         spin_lock(&cli->cl_mod_rpcs_lock);
2388         cli->cl_mod_rpcs_in_flight--;
2389         if (close_req)
2390                 cli->cl_close_rpcs_in_flight--;
2391         /* release the tag in the bitmap */
2392         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2393         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2394         spin_unlock(&cli->cl_mod_rpcs_lock);
2395         wake_up(&cli->cl_mod_rpcs_waitq);
2396 }
2397 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2398