Whamcloud - gitweb
LU-4423 obdclass: use workqueue for zombie management
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55 struct kmem_cache *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 static struct kmem_cache *import_cachep;
58
59 static struct workqueue_struct *zombie_wq;
60
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks, int debug_level);
65
66 static LIST_HEAD(obd_stale_exports);
67 static DEFINE_SPINLOCK(obd_stale_export_lock);
68 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
69
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
72
73 /*
74  * support functions: we could use inter-module communication, but this
75  * is more portable to other OS's
76  */
77 static struct obd_device *obd_device_alloc(void)
78 {
79         struct obd_device *obd;
80
81         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
82         if (obd != NULL) {
83                 obd->obd_magic = OBD_DEVICE_MAGIC;
84         }
85         return obd;
86 }
87
88 static void obd_device_free(struct obd_device *obd)
89 {
90         LASSERT(obd != NULL);
91         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
92                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
93         if (obd->obd_namespace != NULL) {
94                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
95                        obd, obd->obd_namespace, obd->obd_force);
96                 LBUG();
97         }
98         lu_ref_fini(&obd->obd_reference);
99         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
100 }
101
102 struct obd_type *class_search_type(const char *name)
103 {
104         struct list_head *tmp;
105         struct obd_type *type;
106
107         spin_lock(&obd_types_lock);
108         list_for_each(tmp, &obd_types) {
109                 type = list_entry(tmp, struct obd_type, typ_chain);
110                 if (strcmp(type->typ_name, name) == 0) {
111                         spin_unlock(&obd_types_lock);
112                         return type;
113                 }
114         }
115         spin_unlock(&obd_types_lock);
116         return NULL;
117 }
118 EXPORT_SYMBOL(class_search_type);
119
120 struct obd_type *class_get_type(const char *name)
121 {
122         struct obd_type *type = class_search_type(name);
123
124 #ifdef HAVE_MODULE_LOADING_SUPPORT
125         if (!type) {
126                 const char *modname = name;
127
128                 if (strcmp(modname, "obdfilter") == 0)
129                         modname = "ofd";
130
131                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
132                         modname = LUSTRE_OSP_NAME;
133
134                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
135                         modname = LUSTRE_MDT_NAME;
136
137                 if (!request_module("%s", modname)) {
138                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
139                         type = class_search_type(name);
140                 } else {
141                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
142                                            modname);
143                 }
144         }
145 #endif
146         if (type) {
147                 spin_lock(&type->obd_type_lock);
148                 type->typ_refcnt++;
149                 try_module_get(type->typ_dt_ops->o_owner);
150                 spin_unlock(&type->obd_type_lock);
151         }
152         return type;
153 }
154
155 void class_put_type(struct obd_type *type)
156 {
157         LASSERT(type);
158         spin_lock(&type->obd_type_lock);
159         type->typ_refcnt--;
160         module_put(type->typ_dt_ops->o_owner);
161         spin_unlock(&type->obd_type_lock);
162 }
163
164 static void class_sysfs_release(struct kobject *kobj)
165 {
166         OBD_FREE(kobj, sizeof(*kobj));
167 }
168
169 static struct kobj_type class_ktype = {
170         .sysfs_ops      = &lustre_sysfs_ops,
171         .release        = class_sysfs_release,
172 };
173
174 struct kobject *class_setup_tunables(const char *name)
175 {
176         struct kobject *kobj;
177         int rc;
178
179 #ifdef HAVE_SERVER_SUPPORT
180         kobj = kset_find_obj(lustre_kset, name);
181         if (kobj)
182                 return kobj;
183 #endif
184         OBD_ALLOC(kobj, sizeof(*kobj));
185         if (!kobj)
186                 return ERR_PTR(-ENOMEM);
187
188         kobj->kset = lustre_kset;
189         kobject_init(kobj, &class_ktype);
190         rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
191         if (rc) {
192                 kobject_put(kobj);
193                 return ERR_PTR(rc);
194         }
195         return kobj;
196 }
197 EXPORT_SYMBOL(class_setup_tunables);
198
199 #define CLASS_MAX_NAME 1024
200
201 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
202                         bool enable_proc, struct lprocfs_vars *vars,
203                         const char *name, struct lu_device_type *ldt)
204 {
205         struct obd_type *type;
206 #ifdef HAVE_SERVER_SUPPORT
207         struct qstr dname;
208 #endif /* HAVE_SERVER_SUPPORT */
209         int rc = 0;
210
211         ENTRY;
212         /* sanity check */
213         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
214
215         if (class_search_type(name)) {
216                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
217                 RETURN(-EEXIST);
218         }
219
220         rc = -ENOMEM;
221         OBD_ALLOC(type, sizeof(*type));
222         if (type == NULL)
223                 RETURN(rc);
224
225         OBD_ALLOC_PTR(type->typ_dt_ops);
226         OBD_ALLOC_PTR(type->typ_md_ops);
227         OBD_ALLOC(type->typ_name, strlen(name) + 1);
228
229         if (type->typ_dt_ops == NULL ||
230             type->typ_md_ops == NULL ||
231             type->typ_name == NULL)
232                 GOTO (failed, rc);
233
234         *(type->typ_dt_ops) = *dt_ops;
235         /* md_ops is optional */
236         if (md_ops)
237                 *(type->typ_md_ops) = *md_ops;
238         strcpy(type->typ_name, name);
239         spin_lock_init(&type->obd_type_lock);
240
241 #ifdef CONFIG_PROC_FS
242         if (enable_proc) {
243                 type->typ_procroot = lprocfs_register(type->typ_name,
244                                                       proc_lustre_root,
245                                                       vars, type);
246                 if (IS_ERR(type->typ_procroot)) {
247                         rc = PTR_ERR(type->typ_procroot);
248                         type->typ_procroot = NULL;
249                         GOTO(failed, rc);
250                 }
251         }
252 #endif
253 #ifdef HAVE_SERVER_SUPPORT
254         dname.name = name;
255         dname.len = strlen(dname.name);
256         dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
257                                        dname.len);
258         type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
259         if (type->typ_debugfs_entry) {
260                 dput(type->typ_debugfs_entry);
261                 type->typ_sym_filter = true;
262                 goto dir_exist;
263         }
264 #endif /* HAVE_SERVER_SUPPORT */
265
266         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
267                                                     debugfs_lustre_root,
268                                                     NULL, type);
269         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
270                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
271                                              : -ENOMEM;
272                 type->typ_debugfs_entry = NULL;
273                 GOTO(failed, rc);
274         }
275 #ifdef HAVE_SERVER_SUPPORT
276 dir_exist:
277 #endif
278         type->typ_kobj = class_setup_tunables(type->typ_name);
279         if (IS_ERR(type->typ_kobj))
280                 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
281
282         if (ldt) {
283                 type->typ_lu = ldt;
284                 rc = lu_device_type_init(ldt);
285                 if (rc) {
286                         kobject_put(type->typ_kobj);
287                         GOTO(failed, rc);
288                 }
289         }
290
291         spin_lock(&obd_types_lock);
292         list_add(&type->typ_chain, &obd_types);
293         spin_unlock(&obd_types_lock);
294
295         RETURN(0);
296
297 failed:
298 #ifdef HAVE_SERVER_SUPPORT
299         if (type->typ_sym_filter)
300                 type->typ_debugfs_entry = NULL;
301 #endif
302         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
303                 ldebugfs_remove(&type->typ_debugfs_entry);
304         if (type->typ_name != NULL) {
305 #ifdef CONFIG_PROC_FS
306                 if (type->typ_procroot != NULL)
307                         remove_proc_subtree(type->typ_name, proc_lustre_root);
308 #endif
309                 OBD_FREE(type->typ_name, strlen(name) + 1);
310         }
311         if (type->typ_md_ops != NULL)
312                 OBD_FREE_PTR(type->typ_md_ops);
313         if (type->typ_dt_ops != NULL)
314                 OBD_FREE_PTR(type->typ_dt_ops);
315         OBD_FREE(type, sizeof(*type));
316         RETURN(rc);
317 }
318 EXPORT_SYMBOL(class_register_type);
319
320 int class_unregister_type(const char *name)
321 {
322         struct obd_type *type = class_search_type(name);
323         ENTRY;
324
325         if (!type) {
326                 CERROR("unknown obd type\n");
327                 RETURN(-EINVAL);
328         }
329
330         if (type->typ_refcnt) {
331                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
332                 /* This is a bad situation, let's make the best of it */
333                 /* Remove ops, but leave the name for debugging */
334                 OBD_FREE_PTR(type->typ_dt_ops);
335                 OBD_FREE_PTR(type->typ_md_ops);
336                 RETURN(-EBUSY);
337         }
338
339         kobject_put(type->typ_kobj);
340
341         /* we do not use type->typ_procroot as for compatibility purposes
342          * other modules can share names (i.e. lod can use lov entry). so
343          * we can't reference pointer as it can get invalided when another
344          * module removes the entry */
345 #ifdef CONFIG_PROC_FS
346         if (type->typ_procroot != NULL)
347                 remove_proc_subtree(type->typ_name, proc_lustre_root);
348         if (type->typ_procsym != NULL)
349                 lprocfs_remove(&type->typ_procsym);
350 #endif
351 #ifdef HAVE_SERVER_SUPPORT
352         if (type->typ_sym_filter)
353                 type->typ_debugfs_entry = NULL;
354 #endif
355         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
356                 ldebugfs_remove(&type->typ_debugfs_entry);
357
358         if (type->typ_lu)
359                 lu_device_type_fini(type->typ_lu);
360
361         spin_lock(&obd_types_lock);
362         list_del(&type->typ_chain);
363         spin_unlock(&obd_types_lock);
364         OBD_FREE(type->typ_name, strlen(name) + 1);
365         if (type->typ_dt_ops != NULL)
366                 OBD_FREE_PTR(type->typ_dt_ops);
367         if (type->typ_md_ops != NULL)
368                 OBD_FREE_PTR(type->typ_md_ops);
369         OBD_FREE(type, sizeof(*type));
370         RETURN(0);
371 } /* class_unregister_type */
372 EXPORT_SYMBOL(class_unregister_type);
373
374 /**
375  * Create a new obd device.
376  *
377  * Allocate the new obd_device and initialize it.
378  *
379  * \param[in] type_name obd device type string.
380  * \param[in] name      obd device name.
381  * \param[in] uuid      obd device UUID
382  *
383  * \retval newdev         pointer to created obd_device
384  * \retval ERR_PTR(errno) on error
385  */
386 struct obd_device *class_newdev(const char *type_name, const char *name,
387                                 const char *uuid)
388 {
389         struct obd_device *newdev;
390         struct obd_type *type = NULL;
391         ENTRY;
392
393         if (strlen(name) >= MAX_OBD_NAME) {
394                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
395                 RETURN(ERR_PTR(-EINVAL));
396         }
397
398         type = class_get_type(type_name);
399         if (type == NULL){
400                 CERROR("OBD: unknown type: %s\n", type_name);
401                 RETURN(ERR_PTR(-ENODEV));
402         }
403
404         newdev = obd_device_alloc();
405         if (newdev == NULL) {
406                 class_put_type(type);
407                 RETURN(ERR_PTR(-ENOMEM));
408         }
409         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
410         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
411         newdev->obd_type = type;
412         newdev->obd_minor = -1;
413
414         rwlock_init(&newdev->obd_pool_lock);
415         newdev->obd_pool_limit = 0;
416         newdev->obd_pool_slv = 0;
417
418         INIT_LIST_HEAD(&newdev->obd_exports);
419         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
420         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
421         INIT_LIST_HEAD(&newdev->obd_exports_timed);
422         INIT_LIST_HEAD(&newdev->obd_nid_stats);
423         spin_lock_init(&newdev->obd_nid_lock);
424         spin_lock_init(&newdev->obd_dev_lock);
425         mutex_init(&newdev->obd_dev_mutex);
426         spin_lock_init(&newdev->obd_osfs_lock);
427         /* newdev->obd_osfs_age must be set to a value in the distant
428          * past to guarantee a fresh statfs is fetched on mount. */
429         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
430
431         /* XXX belongs in setup not attach  */
432         init_rwsem(&newdev->obd_observer_link_sem);
433         /* recovery data */
434         init_timer(&newdev->obd_recovery_timer);
435         spin_lock_init(&newdev->obd_recovery_task_lock);
436         init_waitqueue_head(&newdev->obd_next_transno_waitq);
437         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
438         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
439         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
440         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
441         INIT_LIST_HEAD(&newdev->obd_evict_list);
442         INIT_LIST_HEAD(&newdev->obd_lwp_list);
443
444         llog_group_init(&newdev->obd_olg);
445         /* Detach drops this */
446         atomic_set(&newdev->obd_refcount, 1);
447         lu_ref_init(&newdev->obd_reference);
448         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
449
450         newdev->obd_conn_inprogress = 0;
451
452         strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
453
454         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
455                newdev->obd_name, newdev);
456
457         return newdev;
458 }
459
460 /**
461  * Free obd device.
462  *
463  * \param[in] obd obd_device to be freed
464  *
465  * \retval none
466  */
467 void class_free_dev(struct obd_device *obd)
468 {
469         struct obd_type *obd_type = obd->obd_type;
470
471         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
472                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
473         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
474                  "obd %p != obd_devs[%d] %p\n",
475                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
476         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
477                  "obd_refcount should be 0, not %d\n",
478                  atomic_read(&obd->obd_refcount));
479         LASSERT(obd_type != NULL);
480
481         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
482                obd->obd_name, obd->obd_type->typ_name);
483
484         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
485                          obd->obd_name, obd->obd_uuid.uuid);
486         if (obd->obd_stopping) {
487                 int err;
488
489                 /* If we're not stopping, we were never set up */
490                 err = obd_cleanup(obd);
491                 if (err)
492                         CERROR("Cleanup %s returned %d\n",
493                                 obd->obd_name, err);
494         }
495
496         obd_device_free(obd);
497
498         class_put_type(obd_type);
499 }
500
501 /**
502  * Unregister obd device.
503  *
504  * Free slot in obd_dev[] used by \a obd.
505  *
506  * \param[in] new_obd obd_device to be unregistered
507  *
508  * \retval none
509  */
510 void class_unregister_device(struct obd_device *obd)
511 {
512         write_lock(&obd_dev_lock);
513         if (obd->obd_minor >= 0) {
514                 LASSERT(obd_devs[obd->obd_minor] == obd);
515                 obd_devs[obd->obd_minor] = NULL;
516                 obd->obd_minor = -1;
517         }
518         write_unlock(&obd_dev_lock);
519 }
520
521 /**
522  * Register obd device.
523  *
524  * Find free slot in obd_devs[], fills it with \a new_obd.
525  *
526  * \param[in] new_obd obd_device to be registered
527  *
528  * \retval 0          success
529  * \retval -EEXIST    device with this name is registered
530  * \retval -EOVERFLOW obd_devs[] is full
531  */
532 int class_register_device(struct obd_device *new_obd)
533 {
534         int ret = 0;
535         int i;
536         int new_obd_minor = 0;
537         bool minor_assign = false;
538         bool retried = false;
539
540 again:
541         write_lock(&obd_dev_lock);
542         for (i = 0; i < class_devno_max(); i++) {
543                 struct obd_device *obd = class_num2obd(i);
544
545                 if (obd != NULL &&
546                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
547
548                         if (!retried) {
549                                 write_unlock(&obd_dev_lock);
550
551                                 /* the obd_device could be waited to be
552                                  * destroyed by the "obd_zombie_impexp_thread".
553                                  */
554                                 obd_zombie_barrier();
555                                 retried = true;
556                                 goto again;
557                         }
558
559                         CERROR("%s: already exists, won't add\n",
560                                obd->obd_name);
561                         /* in case we found a free slot before duplicate */
562                         minor_assign = false;
563                         ret = -EEXIST;
564                         break;
565                 }
566                 if (!minor_assign && obd == NULL) {
567                         new_obd_minor = i;
568                         minor_assign = true;
569                 }
570         }
571
572         if (minor_assign) {
573                 new_obd->obd_minor = new_obd_minor;
574                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
575                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
576                 obd_devs[new_obd_minor] = new_obd;
577         } else {
578                 if (ret == 0) {
579                         ret = -EOVERFLOW;
580                         CERROR("%s: all %u/%u devices used, increase "
581                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
582                                i, class_devno_max(), ret);
583                 }
584         }
585         write_unlock(&obd_dev_lock);
586
587         RETURN(ret);
588 }
589
590 static int class_name2dev_nolock(const char *name)
591 {
592         int i;
593
594         if (!name)
595                 return -1;
596
597         for (i = 0; i < class_devno_max(); i++) {
598                 struct obd_device *obd = class_num2obd(i);
599
600                 if (obd && strcmp(name, obd->obd_name) == 0) {
601                         /* Make sure we finished attaching before we give
602                            out any references */
603                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
604                         if (obd->obd_attached) {
605                                 return i;
606                         }
607                         break;
608                 }
609         }
610
611         return -1;
612 }
613
614 int class_name2dev(const char *name)
615 {
616         int i;
617
618         if (!name)
619                 return -1;
620
621         read_lock(&obd_dev_lock);
622         i = class_name2dev_nolock(name);
623         read_unlock(&obd_dev_lock);
624
625         return i;
626 }
627 EXPORT_SYMBOL(class_name2dev);
628
629 struct obd_device *class_name2obd(const char *name)
630 {
631         int dev = class_name2dev(name);
632
633         if (dev < 0 || dev > class_devno_max())
634                 return NULL;
635         return class_num2obd(dev);
636 }
637 EXPORT_SYMBOL(class_name2obd);
638
639 int class_uuid2dev_nolock(struct obd_uuid *uuid)
640 {
641         int i;
642
643         for (i = 0; i < class_devno_max(); i++) {
644                 struct obd_device *obd = class_num2obd(i);
645
646                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
647                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
648                         return i;
649                 }
650         }
651
652         return -1;
653 }
654
655 int class_uuid2dev(struct obd_uuid *uuid)
656 {
657         int i;
658
659         read_lock(&obd_dev_lock);
660         i = class_uuid2dev_nolock(uuid);
661         read_unlock(&obd_dev_lock);
662
663         return i;
664 }
665 EXPORT_SYMBOL(class_uuid2dev);
666
667 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
668 {
669         int dev = class_uuid2dev(uuid);
670         if (dev < 0)
671                 return NULL;
672         return class_num2obd(dev);
673 }
674 EXPORT_SYMBOL(class_uuid2obd);
675
676 /**
677  * Get obd device from ::obd_devs[]
678  *
679  * \param num [in] array index
680  *
681  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
682  *         otherwise return the obd device there.
683  */
684 struct obd_device *class_num2obd(int num)
685 {
686         struct obd_device *obd = NULL;
687
688         if (num < class_devno_max()) {
689                 obd = obd_devs[num];
690                 if (obd == NULL)
691                         return NULL;
692
693                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
694                          "%p obd_magic %08x != %08x\n",
695                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
696                 LASSERTF(obd->obd_minor == num,
697                          "%p obd_minor %0d != %0d\n",
698                          obd, obd->obd_minor, num);
699         }
700
701         return obd;
702 }
703
704 /**
705  * Find obd in obd_dev[] by name or uuid.
706  *
707  * Increment obd's refcount if found.
708  *
709  * \param[in] str obd name or uuid
710  *
711  * \retval NULL    if not found
712  * \retval target  pointer to found obd_device
713  */
714 struct obd_device *class_dev_by_str(const char *str)
715 {
716         struct obd_device *target = NULL;
717         struct obd_uuid tgtuuid;
718         int rc;
719
720         obd_str2uuid(&tgtuuid, str);
721
722         read_lock(&obd_dev_lock);
723         rc = class_uuid2dev_nolock(&tgtuuid);
724         if (rc < 0)
725                 rc = class_name2dev_nolock(str);
726
727         if (rc >= 0)
728                 target = class_num2obd(rc);
729
730         if (target != NULL)
731                 class_incref(target, "find", current);
732         read_unlock(&obd_dev_lock);
733
734         RETURN(target);
735 }
736 EXPORT_SYMBOL(class_dev_by_str);
737
738 /**
739  * Get obd devices count. Device in any
740  *    state are counted
741  * \retval obd device count
742  */
743 int get_devices_count(void)
744 {
745         int index, max_index = class_devno_max(), dev_count = 0;
746
747         read_lock(&obd_dev_lock);
748         for (index = 0; index <= max_index; index++) {
749                 struct obd_device *obd = class_num2obd(index);
750                 if (obd != NULL)
751                         dev_count++;
752         }
753         read_unlock(&obd_dev_lock);
754
755         return dev_count;
756 }
757 EXPORT_SYMBOL(get_devices_count);
758
759 void class_obd_list(void)
760 {
761         char *status;
762         int i;
763
764         read_lock(&obd_dev_lock);
765         for (i = 0; i < class_devno_max(); i++) {
766                 struct obd_device *obd = class_num2obd(i);
767
768                 if (obd == NULL)
769                         continue;
770                 if (obd->obd_stopping)
771                         status = "ST";
772                 else if (obd->obd_set_up)
773                         status = "UP";
774                 else if (obd->obd_attached)
775                         status = "AT";
776                 else
777                         status = "--";
778                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
779                          i, status, obd->obd_type->typ_name,
780                          obd->obd_name, obd->obd_uuid.uuid,
781                          atomic_read(&obd->obd_refcount));
782         }
783         read_unlock(&obd_dev_lock);
784         return;
785 }
786
787 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
788    specified, then only the client with that uuid is returned,
789    otherwise any client connected to the tgt is returned. */
790 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
791                                           const char * typ_name,
792                                           struct obd_uuid *grp_uuid)
793 {
794         int i;
795
796         read_lock(&obd_dev_lock);
797         for (i = 0; i < class_devno_max(); i++) {
798                 struct obd_device *obd = class_num2obd(i);
799
800                 if (obd == NULL)
801                         continue;
802                 if ((strncmp(obd->obd_type->typ_name, typ_name,
803                              strlen(typ_name)) == 0)) {
804                         if (obd_uuid_equals(tgt_uuid,
805                                             &obd->u.cli.cl_target_uuid) &&
806                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
807                                                          &obd->obd_uuid) : 1)) {
808                                 read_unlock(&obd_dev_lock);
809                                 return obd;
810                         }
811                 }
812         }
813         read_unlock(&obd_dev_lock);
814
815         return NULL;
816 }
817 EXPORT_SYMBOL(class_find_client_obd);
818
819 /* Iterate the obd_device list looking devices have grp_uuid. Start
820    searching at *next, and if a device is found, the next index to look
821    at is saved in *next. If next is NULL, then the first matching device
822    will always be returned. */
823 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
824 {
825         int i;
826
827         if (next == NULL)
828                 i = 0;
829         else if (*next >= 0 && *next < class_devno_max())
830                 i = *next;
831         else
832                 return NULL;
833
834         read_lock(&obd_dev_lock);
835         for (; i < class_devno_max(); i++) {
836                 struct obd_device *obd = class_num2obd(i);
837
838                 if (obd == NULL)
839                         continue;
840                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
841                         if (next != NULL)
842                                 *next = i+1;
843                         read_unlock(&obd_dev_lock);
844                         return obd;
845                 }
846         }
847         read_unlock(&obd_dev_lock);
848
849         return NULL;
850 }
851 EXPORT_SYMBOL(class_devices_in_group);
852
853 /**
854  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
855  * adjust sptlrpc settings accordingly.
856  */
857 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
858 {
859         struct obd_device  *obd;
860         const char         *type;
861         int                 i, rc = 0, rc2;
862
863         LASSERT(namelen > 0);
864
865         read_lock(&obd_dev_lock);
866         for (i = 0; i < class_devno_max(); i++) {
867                 obd = class_num2obd(i);
868
869                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
870                         continue;
871
872                 /* only notify mdc, osc, osp, lwp, mdt, ost
873                  * because only these have a -sptlrpc llog */
874                 type = obd->obd_type->typ_name;
875                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
876                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
877                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
878                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
879                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
880                     strcmp(type, LUSTRE_OST_NAME) != 0)
881                         continue;
882
883                 if (strncmp(obd->obd_name, fsname, namelen))
884                         continue;
885
886                 class_incref(obd, __FUNCTION__, obd);
887                 read_unlock(&obd_dev_lock);
888                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
889                                          sizeof(KEY_SPTLRPC_CONF),
890                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
891                 rc = rc ? rc : rc2;
892                 class_decref(obd, __FUNCTION__, obd);
893                 read_lock(&obd_dev_lock);
894         }
895         read_unlock(&obd_dev_lock);
896         return rc;
897 }
898 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
899
900 void obd_cleanup_caches(void)
901 {
902         ENTRY;
903         if (obd_device_cachep) {
904                 kmem_cache_destroy(obd_device_cachep);
905                 obd_device_cachep = NULL;
906         }
907         if (obdo_cachep) {
908                 kmem_cache_destroy(obdo_cachep);
909                 obdo_cachep = NULL;
910         }
911         if (import_cachep) {
912                 kmem_cache_destroy(import_cachep);
913                 import_cachep = NULL;
914         }
915
916         EXIT;
917 }
918
919 int obd_init_caches(void)
920 {
921         int rc;
922         ENTRY;
923
924         LASSERT(obd_device_cachep == NULL);
925         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
926                                               sizeof(struct obd_device),
927                                               0, 0, NULL);
928         if (!obd_device_cachep)
929                 GOTO(out, rc = -ENOMEM);
930
931         LASSERT(obdo_cachep == NULL);
932         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
933                                         0, 0, NULL);
934         if (!obdo_cachep)
935                 GOTO(out, rc = -ENOMEM);
936
937         LASSERT(import_cachep == NULL);
938         import_cachep = kmem_cache_create("ll_import_cache",
939                                           sizeof(struct obd_import),
940                                           0, 0, NULL);
941         if (!import_cachep)
942                 GOTO(out, rc = -ENOMEM);
943
944         RETURN(0);
945 out:
946         obd_cleanup_caches();
947         RETURN(rc);
948 }
949
950 /* map connection to client */
951 struct obd_export *class_conn2export(struct lustre_handle *conn)
952 {
953         struct obd_export *export;
954         ENTRY;
955
956         if (!conn) {
957                 CDEBUG(D_CACHE, "looking for null handle\n");
958                 RETURN(NULL);
959         }
960
961         if (conn->cookie == -1) {  /* this means assign a new connection */
962                 CDEBUG(D_CACHE, "want a new connection\n");
963                 RETURN(NULL);
964         }
965
966         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
967         export = class_handle2object(conn->cookie, NULL);
968         RETURN(export);
969 }
970 EXPORT_SYMBOL(class_conn2export);
971
972 struct obd_device *class_exp2obd(struct obd_export *exp)
973 {
974         if (exp)
975                 return exp->exp_obd;
976         return NULL;
977 }
978 EXPORT_SYMBOL(class_exp2obd);
979
980 struct obd_device *class_conn2obd(struct lustre_handle *conn)
981 {
982         struct obd_export *export;
983         export = class_conn2export(conn);
984         if (export) {
985                 struct obd_device *obd = export->exp_obd;
986                 class_export_put(export);
987                 return obd;
988         }
989         return NULL;
990 }
991
992 struct obd_import *class_exp2cliimp(struct obd_export *exp)
993 {
994         struct obd_device *obd = exp->exp_obd;
995         if (obd == NULL)
996                 return NULL;
997         return obd->u.cli.cl_import;
998 }
999 EXPORT_SYMBOL(class_exp2cliimp);
1000
1001 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
1002 {
1003         struct obd_device *obd = class_conn2obd(conn);
1004         if (obd == NULL)
1005                 return NULL;
1006         return obd->u.cli.cl_import;
1007 }
1008
1009 /* Export management functions */
1010 static void class_export_destroy(struct obd_export *exp)
1011 {
1012         struct obd_device *obd = exp->exp_obd;
1013         ENTRY;
1014
1015         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
1016         LASSERT(obd != NULL);
1017
1018         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
1019                exp->exp_client_uuid.uuid, obd->obd_name);
1020
1021         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1022         if (exp->exp_connection)
1023                 ptlrpc_put_connection_superhack(exp->exp_connection);
1024
1025         LASSERT(list_empty(&exp->exp_outstanding_replies));
1026         LASSERT(list_empty(&exp->exp_uncommitted_replies));
1027         LASSERT(list_empty(&exp->exp_req_replay_queue));
1028         LASSERT(list_empty(&exp->exp_hp_rpcs));
1029         obd_destroy_export(exp);
1030         /* self export doesn't hold a reference to an obd, although it
1031          * exists until freeing of the obd */
1032         if (exp != obd->obd_self_export)
1033                 class_decref(obd, "export", exp);
1034
1035         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1036         EXIT;
1037 }
1038
1039 static void export_handle_addref(void *export)
1040 {
1041         class_export_get(export);
1042 }
1043
1044 static struct portals_handle_ops export_handle_ops = {
1045         .hop_addref = export_handle_addref,
1046         .hop_free   = NULL,
1047 };
1048
1049 struct obd_export *class_export_get(struct obd_export *exp)
1050 {
1051         atomic_inc(&exp->exp_refcount);
1052         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1053                atomic_read(&exp->exp_refcount));
1054         return exp;
1055 }
1056 EXPORT_SYMBOL(class_export_get);
1057
1058 void class_export_put(struct obd_export *exp)
1059 {
1060         LASSERT(exp != NULL);
1061         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1062         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1063                atomic_read(&exp->exp_refcount) - 1);
1064
1065         if (atomic_dec_and_test(&exp->exp_refcount)) {
1066                 struct obd_device *obd = exp->exp_obd;
1067
1068                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1069                        exp, exp->exp_client_uuid.uuid);
1070
1071                 /* release nid stat refererence */
1072                 lprocfs_exp_cleanup(exp);
1073
1074                 if (exp == obd->obd_self_export) {
1075                         /* self export should be destroyed without
1076                          * zombie thread as it doesn't hold a
1077                          * reference to obd and doesn't hold any
1078                          * resources */
1079                         class_export_destroy(exp);
1080                         /* self export is destroyed, no class
1081                          * references exist and it is safe to free
1082                          * obd */
1083                         class_free_dev(obd);
1084                 } else {
1085                         LASSERT(!list_empty(&exp->exp_obd_chain));
1086                         obd_zombie_export_add(exp);
1087                 }
1088
1089         }
1090 }
1091 EXPORT_SYMBOL(class_export_put);
1092
1093 static void obd_zombie_exp_cull(struct work_struct *ws)
1094 {
1095         struct obd_export *export;
1096
1097         export = container_of(ws, struct obd_export, exp_zombie_work);
1098         class_export_destroy(export);
1099 }
1100
1101 /* Creates a new export, adds it to the hash table, and returns a
1102  * pointer to it. The refcount is 2: one for the hash reference, and
1103  * one for the pointer returned by this function. */
1104 struct obd_export *__class_new_export(struct obd_device *obd,
1105                                       struct obd_uuid *cluuid, bool is_self)
1106 {
1107         struct obd_export *export;
1108         struct cfs_hash *hash = NULL;
1109         int rc = 0;
1110         ENTRY;
1111
1112         OBD_ALLOC_PTR(export);
1113         if (!export)
1114                 return ERR_PTR(-ENOMEM);
1115
1116         export->exp_conn_cnt = 0;
1117         export->exp_lock_hash = NULL;
1118         export->exp_flock_hash = NULL;
1119         /* 2 = class_handle_hash + last */
1120         atomic_set(&export->exp_refcount, 2);
1121         atomic_set(&export->exp_rpc_count, 0);
1122         atomic_set(&export->exp_cb_count, 0);
1123         atomic_set(&export->exp_locks_count, 0);
1124 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1125         INIT_LIST_HEAD(&export->exp_locks_list);
1126         spin_lock_init(&export->exp_locks_list_guard);
1127 #endif
1128         atomic_set(&export->exp_replay_count, 0);
1129         export->exp_obd = obd;
1130         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1131         spin_lock_init(&export->exp_uncommitted_replies_lock);
1132         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1133         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1134         INIT_LIST_HEAD(&export->exp_handle.h_link);
1135         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1136         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1137         class_handle_hash(&export->exp_handle, &export_handle_ops);
1138         export->exp_last_request_time = ktime_get_real_seconds();
1139         spin_lock_init(&export->exp_lock);
1140         spin_lock_init(&export->exp_rpc_lock);
1141         INIT_HLIST_NODE(&export->exp_uuid_hash);
1142         INIT_HLIST_NODE(&export->exp_nid_hash);
1143         INIT_HLIST_NODE(&export->exp_gen_hash);
1144         spin_lock_init(&export->exp_bl_list_lock);
1145         INIT_LIST_HEAD(&export->exp_bl_list);
1146         INIT_LIST_HEAD(&export->exp_stale_list);
1147         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1148
1149         export->exp_sp_peer = LUSTRE_SP_ANY;
1150         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1151         export->exp_client_uuid = *cluuid;
1152         obd_init_export(export);
1153
1154         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1155                 spin_lock(&obd->obd_dev_lock);
1156                 /* shouldn't happen, but might race */
1157                 if (obd->obd_stopping)
1158                         GOTO(exit_unlock, rc = -ENODEV);
1159
1160                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1161                 if (hash == NULL)
1162                         GOTO(exit_unlock, rc = -ENODEV);
1163                 spin_unlock(&obd->obd_dev_lock);
1164
1165                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1166                 if (rc != 0) {
1167                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1168                                       obd->obd_name, cluuid->uuid, rc);
1169                         GOTO(exit_err, rc = -EALREADY);
1170                 }
1171         }
1172
1173         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1174         spin_lock(&obd->obd_dev_lock);
1175         if (obd->obd_stopping) {
1176                 if (hash)
1177                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1178                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1179         }
1180
1181         if (!is_self) {
1182                 class_incref(obd, "export", export);
1183                 list_add_tail(&export->exp_obd_chain_timed,
1184                               &obd->obd_exports_timed);
1185                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1186                 obd->obd_num_exports++;
1187         } else {
1188                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1189                 INIT_LIST_HEAD(&export->exp_obd_chain);
1190         }
1191         spin_unlock(&obd->obd_dev_lock);
1192         if (hash)
1193                 cfs_hash_putref(hash);
1194         RETURN(export);
1195
1196 exit_unlock:
1197         spin_unlock(&obd->obd_dev_lock);
1198 exit_err:
1199         if (hash)
1200                 cfs_hash_putref(hash);
1201         class_handle_unhash(&export->exp_handle);
1202         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1203         obd_destroy_export(export);
1204         OBD_FREE_PTR(export);
1205         return ERR_PTR(rc);
1206 }
1207
1208 struct obd_export *class_new_export(struct obd_device *obd,
1209                                     struct obd_uuid *uuid)
1210 {
1211         return __class_new_export(obd, uuid, false);
1212 }
1213 EXPORT_SYMBOL(class_new_export);
1214
1215 struct obd_export *class_new_export_self(struct obd_device *obd,
1216                                          struct obd_uuid *uuid)
1217 {
1218         return __class_new_export(obd, uuid, true);
1219 }
1220
1221 void class_unlink_export(struct obd_export *exp)
1222 {
1223         class_handle_unhash(&exp->exp_handle);
1224
1225         if (exp->exp_obd->obd_self_export == exp) {
1226                 class_export_put(exp);
1227                 return;
1228         }
1229
1230         spin_lock(&exp->exp_obd->obd_dev_lock);
1231         /* delete an uuid-export hashitem from hashtables */
1232         if (!hlist_unhashed(&exp->exp_uuid_hash))
1233                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1234                              &exp->exp_client_uuid,
1235                              &exp->exp_uuid_hash);
1236
1237 #ifdef HAVE_SERVER_SUPPORT
1238         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1239                 struct tg_export_data   *ted = &exp->exp_target_data;
1240                 struct cfs_hash         *hash;
1241
1242                 /* Because obd_gen_hash will not be released until
1243                  * class_cleanup(), so hash should never be NULL here */
1244                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1245                 LASSERT(hash != NULL);
1246                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1247                              &exp->exp_gen_hash);
1248                 cfs_hash_putref(hash);
1249         }
1250 #endif /* HAVE_SERVER_SUPPORT */
1251
1252         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1253         list_del_init(&exp->exp_obd_chain_timed);
1254         exp->exp_obd->obd_num_exports--;
1255         spin_unlock(&exp->exp_obd->obd_dev_lock);
1256         atomic_inc(&obd_stale_export_num);
1257
1258         /* A reference is kept by obd_stale_exports list */
1259         obd_stale_export_put(exp);
1260 }
1261 EXPORT_SYMBOL(class_unlink_export);
1262
1263 /* Import management functions */
1264 static void class_import_destroy(struct obd_import *imp)
1265 {
1266         ENTRY;
1267
1268         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1269                 imp->imp_obd->obd_name);
1270
1271         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1272
1273         ptlrpc_put_connection_superhack(imp->imp_connection);
1274
1275         while (!list_empty(&imp->imp_conn_list)) {
1276                 struct obd_import_conn *imp_conn;
1277
1278                 imp_conn = list_entry(imp->imp_conn_list.next,
1279                                       struct obd_import_conn, oic_item);
1280                 list_del_init(&imp_conn->oic_item);
1281                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1282                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1283         }
1284
1285         LASSERT(imp->imp_sec == NULL);
1286         class_decref(imp->imp_obd, "import", imp);
1287         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1288         EXIT;
1289 }
1290
1291 static void import_handle_addref(void *import)
1292 {
1293         class_import_get(import);
1294 }
1295
1296 static struct portals_handle_ops import_handle_ops = {
1297         .hop_addref = import_handle_addref,
1298         .hop_free   = NULL,
1299 };
1300
1301 struct obd_import *class_import_get(struct obd_import *import)
1302 {
1303         atomic_inc(&import->imp_refcount);
1304         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1305                atomic_read(&import->imp_refcount),
1306                import->imp_obd->obd_name);
1307         return import;
1308 }
1309 EXPORT_SYMBOL(class_import_get);
1310
1311 void class_import_put(struct obd_import *imp)
1312 {
1313         ENTRY;
1314
1315         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1316
1317         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1318                atomic_read(&imp->imp_refcount) - 1,
1319                imp->imp_obd->obd_name);
1320
1321         if (atomic_dec_and_test(&imp->imp_refcount)) {
1322                 CDEBUG(D_INFO, "final put import %p\n", imp);
1323                 obd_zombie_import_add(imp);
1324         }
1325
1326         /* catch possible import put race */
1327         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1328         EXIT;
1329 }
1330 EXPORT_SYMBOL(class_import_put);
1331
1332 static void init_imp_at(struct imp_at *at) {
1333         int i;
1334         at_init(&at->iat_net_latency, 0, 0);
1335         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1336                 /* max service estimates are tracked on the server side, so
1337                    don't use the AT history here, just use the last reported
1338                    val. (But keep hist for proc histogram, worst_ever) */
1339                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1340                         AT_FLG_NOHIST);
1341         }
1342 }
1343
1344 static void obd_zombie_imp_cull(struct work_struct *ws)
1345 {
1346         struct obd_import *import;
1347
1348         import = container_of(ws, struct obd_import, imp_zombie_work);
1349         class_import_destroy(import);
1350 }
1351
1352 struct obd_import *class_new_import(struct obd_device *obd)
1353 {
1354         struct obd_import *imp;
1355         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1356
1357         OBD_ALLOC(imp, sizeof(*imp));
1358         if (imp == NULL)
1359                 return NULL;
1360
1361         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1362         INIT_LIST_HEAD(&imp->imp_replay_list);
1363         INIT_LIST_HEAD(&imp->imp_sending_list);
1364         INIT_LIST_HEAD(&imp->imp_delayed_list);
1365         INIT_LIST_HEAD(&imp->imp_committed_list);
1366         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1367         imp->imp_known_replied_xid = 0;
1368         imp->imp_replay_cursor = &imp->imp_committed_list;
1369         spin_lock_init(&imp->imp_lock);
1370         imp->imp_last_success_conn = 0;
1371         imp->imp_state = LUSTRE_IMP_NEW;
1372         imp->imp_obd = class_incref(obd, "import", imp);
1373         mutex_init(&imp->imp_sec_mutex);
1374         init_waitqueue_head(&imp->imp_recovery_waitq);
1375         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1376
1377         if (curr_pid_ns->child_reaper)
1378                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1379         else
1380                 imp->imp_sec_refpid = 1;
1381
1382         atomic_set(&imp->imp_refcount, 2);
1383         atomic_set(&imp->imp_unregistering, 0);
1384         atomic_set(&imp->imp_inflight, 0);
1385         atomic_set(&imp->imp_replay_inflight, 0);
1386         atomic_set(&imp->imp_inval_count, 0);
1387         INIT_LIST_HEAD(&imp->imp_conn_list);
1388         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1389         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1390         init_imp_at(&imp->imp_at);
1391
1392         /* the default magic is V2, will be used in connect RPC, and
1393          * then adjusted according to the flags in request/reply. */
1394         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1395
1396         return imp;
1397 }
1398 EXPORT_SYMBOL(class_new_import);
1399
1400 void class_destroy_import(struct obd_import *import)
1401 {
1402         LASSERT(import != NULL);
1403         LASSERT(import != LP_POISON);
1404
1405         class_handle_unhash(&import->imp_handle);
1406
1407         spin_lock(&import->imp_lock);
1408         import->imp_generation++;
1409         spin_unlock(&import->imp_lock);
1410         class_import_put(import);
1411 }
1412 EXPORT_SYMBOL(class_destroy_import);
1413
1414 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1415
1416 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1417 {
1418         spin_lock(&exp->exp_locks_list_guard);
1419
1420         LASSERT(lock->l_exp_refs_nr >= 0);
1421
1422         if (lock->l_exp_refs_target != NULL &&
1423             lock->l_exp_refs_target != exp) {
1424                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1425                               exp, lock, lock->l_exp_refs_target);
1426         }
1427         if ((lock->l_exp_refs_nr ++) == 0) {
1428                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1429                 lock->l_exp_refs_target = exp;
1430         }
1431         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1432                lock, exp, lock->l_exp_refs_nr);
1433         spin_unlock(&exp->exp_locks_list_guard);
1434 }
1435 EXPORT_SYMBOL(__class_export_add_lock_ref);
1436
1437 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1438 {
1439         spin_lock(&exp->exp_locks_list_guard);
1440         LASSERT(lock->l_exp_refs_nr > 0);
1441         if (lock->l_exp_refs_target != exp) {
1442                 LCONSOLE_WARN("lock %p, "
1443                               "mismatching export pointers: %p, %p\n",
1444                               lock, lock->l_exp_refs_target, exp);
1445         }
1446         if (-- lock->l_exp_refs_nr == 0) {
1447                 list_del_init(&lock->l_exp_refs_link);
1448                 lock->l_exp_refs_target = NULL;
1449         }
1450         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1451                lock, exp, lock->l_exp_refs_nr);
1452         spin_unlock(&exp->exp_locks_list_guard);
1453 }
1454 EXPORT_SYMBOL(__class_export_del_lock_ref);
1455 #endif
1456
1457 /* A connection defines an export context in which preallocation can
1458    be managed. This releases the export pointer reference, and returns
1459    the export handle, so the export refcount is 1 when this function
1460    returns. */
1461 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1462                   struct obd_uuid *cluuid)
1463 {
1464         struct obd_export *export;
1465         LASSERT(conn != NULL);
1466         LASSERT(obd != NULL);
1467         LASSERT(cluuid != NULL);
1468         ENTRY;
1469
1470         export = class_new_export(obd, cluuid);
1471         if (IS_ERR(export))
1472                 RETURN(PTR_ERR(export));
1473
1474         conn->cookie = export->exp_handle.h_cookie;
1475         class_export_put(export);
1476
1477         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1478                cluuid->uuid, conn->cookie);
1479         RETURN(0);
1480 }
1481 EXPORT_SYMBOL(class_connect);
1482
1483 /* if export is involved in recovery then clean up related things */
1484 static void class_export_recovery_cleanup(struct obd_export *exp)
1485 {
1486         struct obd_device *obd = exp->exp_obd;
1487
1488         spin_lock(&obd->obd_recovery_task_lock);
1489         if (obd->obd_recovering) {
1490                 if (exp->exp_in_recovery) {
1491                         spin_lock(&exp->exp_lock);
1492                         exp->exp_in_recovery = 0;
1493                         spin_unlock(&exp->exp_lock);
1494                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1495                         atomic_dec(&obd->obd_connected_clients);
1496                 }
1497
1498                 /* if called during recovery then should update
1499                  * obd_stale_clients counter,
1500                  * lightweight exports are not counted */
1501                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1502                         exp->exp_obd->obd_stale_clients++;
1503         }
1504         spin_unlock(&obd->obd_recovery_task_lock);
1505
1506         spin_lock(&exp->exp_lock);
1507         /** Cleanup req replay fields */
1508         if (exp->exp_req_replay_needed) {
1509                 exp->exp_req_replay_needed = 0;
1510
1511                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1512                 atomic_dec(&obd->obd_req_replay_clients);
1513         }
1514
1515         /** Cleanup lock replay data */
1516         if (exp->exp_lock_replay_needed) {
1517                 exp->exp_lock_replay_needed = 0;
1518
1519                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1520                 atomic_dec(&obd->obd_lock_replay_clients);
1521         }
1522         spin_unlock(&exp->exp_lock);
1523 }
1524
1525 /* This function removes 1-3 references from the export:
1526  * 1 - for export pointer passed
1527  * and if disconnect really need
1528  * 2 - removing from hash
1529  * 3 - in client_unlink_export
1530  * The export pointer passed to this function can destroyed */
1531 int class_disconnect(struct obd_export *export)
1532 {
1533         int already_disconnected;
1534         ENTRY;
1535
1536         if (export == NULL) {
1537                 CWARN("attempting to free NULL export %p\n", export);
1538                 RETURN(-EINVAL);
1539         }
1540
1541         spin_lock(&export->exp_lock);
1542         already_disconnected = export->exp_disconnected;
1543         export->exp_disconnected = 1;
1544         /*  We hold references of export for uuid hash
1545          *  and nid_hash and export link at least. So
1546          *  it is safe to call cfs_hash_del in there.  */
1547         if (!hlist_unhashed(&export->exp_nid_hash))
1548                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1549                              &export->exp_connection->c_peer.nid,
1550                              &export->exp_nid_hash);
1551         spin_unlock(&export->exp_lock);
1552
1553         /* class_cleanup(), abort_recovery(), and class_fail_export()
1554          * all end up in here, and if any of them race we shouldn't
1555          * call extra class_export_puts(). */
1556         if (already_disconnected) {
1557                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1558                 GOTO(no_disconn, already_disconnected);
1559         }
1560
1561         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1562                export->exp_handle.h_cookie);
1563
1564         class_export_recovery_cleanup(export);
1565         class_unlink_export(export);
1566 no_disconn:
1567         class_export_put(export);
1568         RETURN(0);
1569 }
1570 EXPORT_SYMBOL(class_disconnect);
1571
1572 /* Return non-zero for a fully connected export */
1573 int class_connected_export(struct obd_export *exp)
1574 {
1575         int connected = 0;
1576
1577         if (exp) {
1578                 spin_lock(&exp->exp_lock);
1579                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1580                 spin_unlock(&exp->exp_lock);
1581         }
1582         return connected;
1583 }
1584 EXPORT_SYMBOL(class_connected_export);
1585
1586 static void class_disconnect_export_list(struct list_head *list,
1587                                          enum obd_option flags)
1588 {
1589         int rc;
1590         struct obd_export *exp;
1591         ENTRY;
1592
1593         /* It's possible that an export may disconnect itself, but
1594          * nothing else will be added to this list. */
1595         while (!list_empty(list)) {
1596                 exp = list_entry(list->next, struct obd_export,
1597                                  exp_obd_chain);
1598                 /* need for safe call CDEBUG after obd_disconnect */
1599                 class_export_get(exp);
1600
1601                 spin_lock(&exp->exp_lock);
1602                 exp->exp_flags = flags;
1603                 spin_unlock(&exp->exp_lock);
1604
1605                 if (obd_uuid_equals(&exp->exp_client_uuid,
1606                                     &exp->exp_obd->obd_uuid)) {
1607                         CDEBUG(D_HA,
1608                                "exp %p export uuid == obd uuid, don't discon\n",
1609                                exp);
1610                         /* Need to delete this now so we don't end up pointing
1611                          * to work_list later when this export is cleaned up. */
1612                         list_del_init(&exp->exp_obd_chain);
1613                         class_export_put(exp);
1614                         continue;
1615                 }
1616
1617                 class_export_get(exp);
1618                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1619                        "last request at %lld\n",
1620                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1621                        exp, exp->exp_last_request_time);
1622                 /* release one export reference anyway */
1623                 rc = obd_disconnect(exp);
1624
1625                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1626                        obd_export_nid2str(exp), exp, rc);
1627                 class_export_put(exp);
1628         }
1629         EXIT;
1630 }
1631
1632 void class_disconnect_exports(struct obd_device *obd)
1633 {
1634         struct list_head work_list;
1635         ENTRY;
1636
1637         /* Move all of the exports from obd_exports to a work list, en masse. */
1638         INIT_LIST_HEAD(&work_list);
1639         spin_lock(&obd->obd_dev_lock);
1640         list_splice_init(&obd->obd_exports, &work_list);
1641         list_splice_init(&obd->obd_delayed_exports, &work_list);
1642         spin_unlock(&obd->obd_dev_lock);
1643
1644         if (!list_empty(&work_list)) {
1645                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1646                        "disconnecting them\n", obd->obd_minor, obd);
1647                 class_disconnect_export_list(&work_list,
1648                                              exp_flags_from_obd(obd));
1649         } else
1650                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1651                        obd->obd_minor, obd);
1652         EXIT;
1653 }
1654 EXPORT_SYMBOL(class_disconnect_exports);
1655
1656 /* Remove exports that have not completed recovery.
1657  */
1658 void class_disconnect_stale_exports(struct obd_device *obd,
1659                                     int (*test_export)(struct obd_export *))
1660 {
1661         struct list_head work_list;
1662         struct obd_export *exp, *n;
1663         int evicted = 0;
1664         ENTRY;
1665
1666         INIT_LIST_HEAD(&work_list);
1667         spin_lock(&obd->obd_dev_lock);
1668         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1669                                  exp_obd_chain) {
1670                 /* don't count self-export as client */
1671                 if (obd_uuid_equals(&exp->exp_client_uuid,
1672                                     &exp->exp_obd->obd_uuid))
1673                         continue;
1674
1675                 /* don't evict clients which have no slot in last_rcvd
1676                  * (e.g. lightweight connection) */
1677                 if (exp->exp_target_data.ted_lr_idx == -1)
1678                         continue;
1679
1680                 spin_lock(&exp->exp_lock);
1681                 if (exp->exp_failed || test_export(exp)) {
1682                         spin_unlock(&exp->exp_lock);
1683                         continue;
1684                 }
1685                 exp->exp_failed = 1;
1686                 spin_unlock(&exp->exp_lock);
1687
1688                 list_move(&exp->exp_obd_chain, &work_list);
1689                 evicted++;
1690                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1691                        obd->obd_name, exp->exp_client_uuid.uuid,
1692                        obd_export_nid2str(exp));
1693                 print_export_data(exp, "EVICTING", 0, D_HA);
1694         }
1695         spin_unlock(&obd->obd_dev_lock);
1696
1697         if (evicted)
1698                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1699                               obd->obd_name, evicted);
1700
1701         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1702                                                  OBD_OPT_ABORT_RECOV);
1703         EXIT;
1704 }
1705 EXPORT_SYMBOL(class_disconnect_stale_exports);
1706
1707 void class_fail_export(struct obd_export *exp)
1708 {
1709         int rc, already_failed;
1710
1711         spin_lock(&exp->exp_lock);
1712         already_failed = exp->exp_failed;
1713         exp->exp_failed = 1;
1714         spin_unlock(&exp->exp_lock);
1715
1716         if (already_failed) {
1717                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1718                        exp, exp->exp_client_uuid.uuid);
1719                 return;
1720         }
1721
1722         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1723                exp, exp->exp_client_uuid.uuid);
1724
1725         if (obd_dump_on_timeout)
1726                 libcfs_debug_dumplog();
1727
1728         /* need for safe call CDEBUG after obd_disconnect */
1729         class_export_get(exp);
1730
1731         /* Most callers into obd_disconnect are removing their own reference
1732          * (request, for example) in addition to the one from the hash table.
1733          * We don't have such a reference here, so make one. */
1734         class_export_get(exp);
1735         rc = obd_disconnect(exp);
1736         if (rc)
1737                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1738         else
1739                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1740                        exp, exp->exp_client_uuid.uuid);
1741         class_export_put(exp);
1742 }
1743 EXPORT_SYMBOL(class_fail_export);
1744
1745 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1746 {
1747         struct cfs_hash *nid_hash;
1748         struct obd_export *doomed_exp = NULL;
1749         int exports_evicted = 0;
1750
1751         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1752
1753         spin_lock(&obd->obd_dev_lock);
1754         /* umount has run already, so evict thread should leave
1755          * its task to umount thread now */
1756         if (obd->obd_stopping) {
1757                 spin_unlock(&obd->obd_dev_lock);
1758                 return exports_evicted;
1759         }
1760         nid_hash = obd->obd_nid_hash;
1761         cfs_hash_getref(nid_hash);
1762         spin_unlock(&obd->obd_dev_lock);
1763
1764         do {
1765                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1766                 if (doomed_exp == NULL)
1767                         break;
1768
1769                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1770                          "nid %s found, wanted nid %s, requested nid %s\n",
1771                          obd_export_nid2str(doomed_exp),
1772                          libcfs_nid2str(nid_key), nid);
1773                 LASSERTF(doomed_exp != obd->obd_self_export,
1774                          "self-export is hashed by NID?\n");
1775                 exports_evicted++;
1776                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1777                               "request\n", obd->obd_name,
1778                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1779                               obd_export_nid2str(doomed_exp));
1780                 class_fail_export(doomed_exp);
1781                 class_export_put(doomed_exp);
1782         } while (1);
1783
1784         cfs_hash_putref(nid_hash);
1785
1786         if (!exports_evicted)
1787                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1788                        obd->obd_name, nid);
1789         return exports_evicted;
1790 }
1791 EXPORT_SYMBOL(obd_export_evict_by_nid);
1792
1793 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1794 {
1795         struct cfs_hash *uuid_hash;
1796         struct obd_export *doomed_exp = NULL;
1797         struct obd_uuid doomed_uuid;
1798         int exports_evicted = 0;
1799
1800         spin_lock(&obd->obd_dev_lock);
1801         if (obd->obd_stopping) {
1802                 spin_unlock(&obd->obd_dev_lock);
1803                 return exports_evicted;
1804         }
1805         uuid_hash = obd->obd_uuid_hash;
1806         cfs_hash_getref(uuid_hash);
1807         spin_unlock(&obd->obd_dev_lock);
1808
1809         obd_str2uuid(&doomed_uuid, uuid);
1810         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1811                 CERROR("%s: can't evict myself\n", obd->obd_name);
1812                 cfs_hash_putref(uuid_hash);
1813                 return exports_evicted;
1814         }
1815
1816         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1817
1818         if (doomed_exp == NULL) {
1819                 CERROR("%s: can't disconnect %s: no exports found\n",
1820                        obd->obd_name, uuid);
1821         } else {
1822                 CWARN("%s: evicting %s at adminstrative request\n",
1823                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1824                 class_fail_export(doomed_exp);
1825                 class_export_put(doomed_exp);
1826                 exports_evicted++;
1827         }
1828         cfs_hash_putref(uuid_hash);
1829
1830         return exports_evicted;
1831 }
1832
1833 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1834 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1835 EXPORT_SYMBOL(class_export_dump_hook);
1836 #endif
1837
1838 static void print_export_data(struct obd_export *exp, const char *status,
1839                               int locks, int debug_level)
1840 {
1841         struct ptlrpc_reply_state *rs;
1842         struct ptlrpc_reply_state *first_reply = NULL;
1843         int nreplies = 0;
1844
1845         spin_lock(&exp->exp_lock);
1846         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1847                             rs_exp_list) {
1848                 if (nreplies == 0)
1849                         first_reply = rs;
1850                 nreplies++;
1851         }
1852         spin_unlock(&exp->exp_lock);
1853
1854         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1855                "%p %s %llu stale:%d\n",
1856                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1857                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1858                atomic_read(&exp->exp_rpc_count),
1859                atomic_read(&exp->exp_cb_count),
1860                atomic_read(&exp->exp_locks_count),
1861                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1862                nreplies, first_reply, nreplies > 3 ? "..." : "",
1863                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1864 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1865         if (locks && class_export_dump_hook != NULL)
1866                 class_export_dump_hook(exp);
1867 #endif
1868 }
1869
1870 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1871 {
1872         struct obd_export *exp;
1873
1874         spin_lock(&obd->obd_dev_lock);
1875         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1876                 print_export_data(exp, "ACTIVE", locks, debug_level);
1877         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1878                 print_export_data(exp, "UNLINKED", locks, debug_level);
1879         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1880                 print_export_data(exp, "DELAYED", locks, debug_level);
1881         spin_unlock(&obd->obd_dev_lock);
1882 }
1883
1884 void obd_exports_barrier(struct obd_device *obd)
1885 {
1886         int waited = 2;
1887         LASSERT(list_empty(&obd->obd_exports));
1888         spin_lock(&obd->obd_dev_lock);
1889         while (!list_empty(&obd->obd_unlinked_exports)) {
1890                 spin_unlock(&obd->obd_dev_lock);
1891                 set_current_state(TASK_UNINTERRUPTIBLE);
1892                 schedule_timeout(cfs_time_seconds(waited));
1893                 if (waited > 5 && is_power_of_2(waited)) {
1894                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1895                                       "more than %d seconds. "
1896                                       "The obd refcount = %d. Is it stuck?\n",
1897                                       obd->obd_name, waited,
1898                                       atomic_read(&obd->obd_refcount));
1899                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1900                 }
1901                 waited *= 2;
1902                 spin_lock(&obd->obd_dev_lock);
1903         }
1904         spin_unlock(&obd->obd_dev_lock);
1905 }
1906 EXPORT_SYMBOL(obd_exports_barrier);
1907
1908 /**
1909  * Add export to the obd_zombe thread and notify it.
1910  */
1911 static void obd_zombie_export_add(struct obd_export *exp) {
1912         atomic_dec(&obd_stale_export_num);
1913         spin_lock(&exp->exp_obd->obd_dev_lock);
1914         LASSERT(!list_empty(&exp->exp_obd_chain));
1915         list_del_init(&exp->exp_obd_chain);
1916         spin_unlock(&exp->exp_obd->obd_dev_lock);
1917
1918         queue_work(zombie_wq, &exp->exp_zombie_work);
1919 }
1920
1921 /**
1922  * Add import to the obd_zombe thread and notify it.
1923  */
1924 static void obd_zombie_import_add(struct obd_import *imp) {
1925         LASSERT(imp->imp_sec == NULL);
1926
1927         queue_work(zombie_wq, &imp->imp_zombie_work);
1928 }
1929
1930 /**
1931  * wait when obd_zombie import/export queues become empty
1932  */
1933 void obd_zombie_barrier(void)
1934 {
1935         flush_workqueue(zombie_wq);
1936 }
1937 EXPORT_SYMBOL(obd_zombie_barrier);
1938
1939
1940 struct obd_export *obd_stale_export_get(void)
1941 {
1942         struct obd_export *exp = NULL;
1943         ENTRY;
1944
1945         spin_lock(&obd_stale_export_lock);
1946         if (!list_empty(&obd_stale_exports)) {
1947                 exp = list_entry(obd_stale_exports.next,
1948                                  struct obd_export, exp_stale_list);
1949                 list_del_init(&exp->exp_stale_list);
1950         }
1951         spin_unlock(&obd_stale_export_lock);
1952
1953         if (exp) {
1954                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1955                        atomic_read(&obd_stale_export_num));
1956         }
1957         RETURN(exp);
1958 }
1959 EXPORT_SYMBOL(obd_stale_export_get);
1960
1961 void obd_stale_export_put(struct obd_export *exp)
1962 {
1963         ENTRY;
1964
1965         LASSERT(list_empty(&exp->exp_stale_list));
1966         if (exp->exp_lock_hash &&
1967             atomic_read(&exp->exp_lock_hash->hs_count)) {
1968                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1969                        atomic_read(&obd_stale_export_num));
1970
1971                 spin_lock_bh(&exp->exp_bl_list_lock);
1972                 spin_lock(&obd_stale_export_lock);
1973                 /* Add to the tail if there is no blocked locks,
1974                  * to the head otherwise. */
1975                 if (list_empty(&exp->exp_bl_list))
1976                         list_add_tail(&exp->exp_stale_list,
1977                                       &obd_stale_exports);
1978                 else
1979                         list_add(&exp->exp_stale_list,
1980                                  &obd_stale_exports);
1981
1982                 spin_unlock(&obd_stale_export_lock);
1983                 spin_unlock_bh(&exp->exp_bl_list_lock);
1984         } else {
1985                 class_export_put(exp);
1986         }
1987         EXIT;
1988 }
1989 EXPORT_SYMBOL(obd_stale_export_put);
1990
1991 /**
1992  * Adjust the position of the export in the stale list,
1993  * i.e. move to the head of the list if is needed.
1994  **/
1995 void obd_stale_export_adjust(struct obd_export *exp)
1996 {
1997         LASSERT(exp != NULL);
1998         spin_lock_bh(&exp->exp_bl_list_lock);
1999         spin_lock(&obd_stale_export_lock);
2000
2001         if (!list_empty(&exp->exp_stale_list) &&
2002             !list_empty(&exp->exp_bl_list))
2003                 list_move(&exp->exp_stale_list, &obd_stale_exports);
2004
2005         spin_unlock(&obd_stale_export_lock);
2006         spin_unlock_bh(&exp->exp_bl_list_lock);
2007 }
2008 EXPORT_SYMBOL(obd_stale_export_adjust);
2009
2010 /**
2011  * start destroy zombie import/export thread
2012  */
2013 int obd_zombie_impexp_init(void)
2014 {
2015         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
2016         if (!zombie_wq)
2017                 return -ENOMEM;
2018
2019         return 0;
2020 }
2021
2022 /**
2023  * stop destroy zombie import/export thread
2024  */
2025 void obd_zombie_impexp_stop(void)
2026 {
2027         destroy_workqueue(zombie_wq);
2028         LASSERT(list_empty(&obd_stale_exports));
2029 }
2030
2031 /***** Kernel-userspace comm helpers *******/
2032
2033 /* Get length of entire message, including header */
2034 int kuc_len(int payload_len)
2035 {
2036         return sizeof(struct kuc_hdr) + payload_len;
2037 }
2038 EXPORT_SYMBOL(kuc_len);
2039
2040 /* Get a pointer to kuc header, given a ptr to the payload
2041  * @param p Pointer to payload area
2042  * @returns Pointer to kuc header
2043  */
2044 struct kuc_hdr * kuc_ptr(void *p)
2045 {
2046         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2047         LASSERT(lh->kuc_magic == KUC_MAGIC);
2048         return lh;
2049 }
2050 EXPORT_SYMBOL(kuc_ptr);
2051
2052 /* Alloc space for a message, and fill in header
2053  * @return Pointer to payload area
2054  */
2055 void *kuc_alloc(int payload_len, int transport, int type)
2056 {
2057         struct kuc_hdr *lh;
2058         int len = kuc_len(payload_len);
2059
2060         OBD_ALLOC(lh, len);
2061         if (lh == NULL)
2062                 return ERR_PTR(-ENOMEM);
2063
2064         lh->kuc_magic = KUC_MAGIC;
2065         lh->kuc_transport = transport;
2066         lh->kuc_msgtype = type;
2067         lh->kuc_msglen = len;
2068
2069         return (void *)(lh + 1);
2070 }
2071 EXPORT_SYMBOL(kuc_alloc);
2072
2073 /* Takes pointer to payload area */
2074 void kuc_free(void *p, int payload_len)
2075 {
2076         struct kuc_hdr *lh = kuc_ptr(p);
2077         OBD_FREE(lh, kuc_len(payload_len));
2078 }
2079 EXPORT_SYMBOL(kuc_free);
2080
2081 struct obd_request_slot_waiter {
2082         struct list_head        orsw_entry;
2083         wait_queue_head_t       orsw_waitq;
2084         bool                    orsw_signaled;
2085 };
2086
2087 static bool obd_request_slot_avail(struct client_obd *cli,
2088                                    struct obd_request_slot_waiter *orsw)
2089 {
2090         bool avail;
2091
2092         spin_lock(&cli->cl_loi_list_lock);
2093         avail = !!list_empty(&orsw->orsw_entry);
2094         spin_unlock(&cli->cl_loi_list_lock);
2095
2096         return avail;
2097 };
2098
2099 /*
2100  * For network flow control, the RPC sponsor needs to acquire a credit
2101  * before sending the RPC. The credits count for a connection is defined
2102  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2103  * the subsequent RPC sponsors need to wait until others released their
2104  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2105  */
2106 int obd_get_request_slot(struct client_obd *cli)
2107 {
2108         struct obd_request_slot_waiter   orsw;
2109         struct l_wait_info               lwi;
2110         int                              rc;
2111
2112         spin_lock(&cli->cl_loi_list_lock);
2113         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2114                 cli->cl_rpcs_in_flight++;
2115                 spin_unlock(&cli->cl_loi_list_lock);
2116                 return 0;
2117         }
2118
2119         init_waitqueue_head(&orsw.orsw_waitq);
2120         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2121         orsw.orsw_signaled = false;
2122         spin_unlock(&cli->cl_loi_list_lock);
2123
2124         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2125         rc = l_wait_event(orsw.orsw_waitq,
2126                           obd_request_slot_avail(cli, &orsw) ||
2127                           orsw.orsw_signaled,
2128                           &lwi);
2129
2130         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2131          * freed but other (such as obd_put_request_slot) is using it. */
2132         spin_lock(&cli->cl_loi_list_lock);
2133         if (rc != 0) {
2134                 if (!orsw.orsw_signaled) {
2135                         if (list_empty(&orsw.orsw_entry))
2136                                 cli->cl_rpcs_in_flight--;
2137                         else
2138                                 list_del(&orsw.orsw_entry);
2139                 }
2140         }
2141
2142         if (orsw.orsw_signaled) {
2143                 LASSERT(list_empty(&orsw.orsw_entry));
2144
2145                 rc = -EINTR;
2146         }
2147         spin_unlock(&cli->cl_loi_list_lock);
2148
2149         return rc;
2150 }
2151 EXPORT_SYMBOL(obd_get_request_slot);
2152
2153 void obd_put_request_slot(struct client_obd *cli)
2154 {
2155         struct obd_request_slot_waiter *orsw;
2156
2157         spin_lock(&cli->cl_loi_list_lock);
2158         cli->cl_rpcs_in_flight--;
2159
2160         /* If there is free slot, wakeup the first waiter. */
2161         if (!list_empty(&cli->cl_flight_waiters) &&
2162             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2163                 orsw = list_entry(cli->cl_flight_waiters.next,
2164                                   struct obd_request_slot_waiter, orsw_entry);
2165                 list_del_init(&orsw->orsw_entry);
2166                 cli->cl_rpcs_in_flight++;
2167                 wake_up(&orsw->orsw_waitq);
2168         }
2169         spin_unlock(&cli->cl_loi_list_lock);
2170 }
2171 EXPORT_SYMBOL(obd_put_request_slot);
2172
2173 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2174 {
2175         return cli->cl_max_rpcs_in_flight;
2176 }
2177 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2178
2179 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2180 {
2181         struct obd_request_slot_waiter *orsw;
2182         __u32                           old;
2183         int                             diff;
2184         int                             i;
2185         char                            *typ_name;
2186         int                             rc;
2187
2188         if (max > OBD_MAX_RIF_MAX || max < 1)
2189                 return -ERANGE;
2190
2191         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2192         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2193                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2194                  * strictly lower that max_rpcs_in_flight */
2195                 if (max < 2) {
2196                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2197                                "because it must be higher than "
2198                                "max_mod_rpcs_in_flight value",
2199                                cli->cl_import->imp_obd->obd_name);
2200                         return -ERANGE;
2201                 }
2202                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2203                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2204                         if (rc != 0)
2205                                 return rc;
2206                 }
2207         }
2208
2209         spin_lock(&cli->cl_loi_list_lock);
2210         old = cli->cl_max_rpcs_in_flight;
2211         cli->cl_max_rpcs_in_flight = max;
2212         client_adjust_max_dirty(cli);
2213
2214         diff = max - old;
2215
2216         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2217         for (i = 0; i < diff; i++) {
2218                 if (list_empty(&cli->cl_flight_waiters))
2219                         break;
2220
2221                 orsw = list_entry(cli->cl_flight_waiters.next,
2222                                   struct obd_request_slot_waiter, orsw_entry);
2223                 list_del_init(&orsw->orsw_entry);
2224                 cli->cl_rpcs_in_flight++;
2225                 wake_up(&orsw->orsw_waitq);
2226         }
2227         spin_unlock(&cli->cl_loi_list_lock);
2228
2229         return 0;
2230 }
2231 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2232
2233 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2234 {
2235         return cli->cl_max_mod_rpcs_in_flight;
2236 }
2237 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2238
2239 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2240 {
2241         struct obd_connect_data *ocd;
2242         __u16 maxmodrpcs;
2243         __u16 prev;
2244
2245         if (max > OBD_MAX_RIF_MAX || max < 1)
2246                 return -ERANGE;
2247
2248         /* cannot exceed or equal max_rpcs_in_flight */
2249         if (max >= cli->cl_max_rpcs_in_flight) {
2250                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2251                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2252                        cli->cl_import->imp_obd->obd_name,
2253                        max, cli->cl_max_rpcs_in_flight);
2254                 return -ERANGE;
2255         }
2256
2257         /* cannot exceed max modify RPCs in flight supported by the server */
2258         ocd = &cli->cl_import->imp_connect_data;
2259         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2260                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2261         else
2262                 maxmodrpcs = 1;
2263         if (max > maxmodrpcs) {
2264                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2265                        "higher than max_mod_rpcs_per_client value (%hu) "
2266                        "returned by the server at connection\n",
2267                        cli->cl_import->imp_obd->obd_name,
2268                        max, maxmodrpcs);
2269                 return -ERANGE;
2270         }
2271
2272         spin_lock(&cli->cl_mod_rpcs_lock);
2273
2274         prev = cli->cl_max_mod_rpcs_in_flight;
2275         cli->cl_max_mod_rpcs_in_flight = max;
2276
2277         /* wakeup waiters if limit has been increased */
2278         if (cli->cl_max_mod_rpcs_in_flight > prev)
2279                 wake_up(&cli->cl_mod_rpcs_waitq);
2280
2281         spin_unlock(&cli->cl_mod_rpcs_lock);
2282
2283         return 0;
2284 }
2285 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2286
2287
2288 #define pct(a, b) (b ? a * 100 / b : 0)
2289 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2290                                struct seq_file *seq)
2291 {
2292         unsigned long mod_tot = 0, mod_cum;
2293         struct timespec64 now;
2294         int i;
2295
2296         ktime_get_real_ts64(&now);
2297
2298         spin_lock(&cli->cl_mod_rpcs_lock);
2299
2300         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2301                    (s64)now.tv_sec, now.tv_nsec);
2302         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2303                    cli->cl_mod_rpcs_in_flight);
2304
2305         seq_printf(seq, "\n\t\t\tmodify\n");
2306         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2307
2308         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2309
2310         mod_cum = 0;
2311         for (i = 0; i < OBD_HIST_MAX; i++) {
2312                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2313                 mod_cum += mod;
2314                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2315                            i, mod, pct(mod, mod_tot),
2316                            pct(mod_cum, mod_tot));
2317                 if (mod_cum == mod_tot)
2318                         break;
2319         }
2320
2321         spin_unlock(&cli->cl_mod_rpcs_lock);
2322
2323         return 0;
2324 }
2325 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2326 #undef pct
2327
2328
2329 /* The number of modify RPCs sent in parallel is limited
2330  * because the server has a finite number of slots per client to
2331  * store request result and ensure reply reconstruction when needed.
2332  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2333  * that takes into account server limit and cl_max_rpcs_in_flight
2334  * value.
2335  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2336  * one close request is allowed above the maximum.
2337  */
2338 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2339                                                  bool close_req)
2340 {
2341         bool avail;
2342
2343         /* A slot is available if
2344          * - number of modify RPCs in flight is less than the max
2345          * - it's a close RPC and no other close request is in flight
2346          */
2347         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2348                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2349
2350         return avail;
2351 }
2352
2353 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2354                                          bool close_req)
2355 {
2356         bool avail;
2357
2358         spin_lock(&cli->cl_mod_rpcs_lock);
2359         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2360         spin_unlock(&cli->cl_mod_rpcs_lock);
2361         return avail;
2362 }
2363
2364 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2365 {
2366         if (it != NULL &&
2367             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2368              it->it_op == IT_READDIR ||
2369              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2370                         return true;
2371         return false;
2372 }
2373
2374 /* Get a modify RPC slot from the obd client @cli according
2375  * to the kind of operation @opc that is going to be sent
2376  * and the intent @it of the operation if it applies.
2377  * If the maximum number of modify RPCs in flight is reached
2378  * the thread is put to sleep.
2379  * Returns the tag to be set in the request message. Tag 0
2380  * is reserved for non-modifying requests.
2381  */
2382 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2383                            struct lookup_intent *it)
2384 {
2385         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2386         bool                    close_req = false;
2387         __u16                   i, max;
2388
2389         /* read-only metadata RPCs don't consume a slot on MDT
2390          * for reply reconstruction
2391          */
2392         if (obd_skip_mod_rpc_slot(it))
2393                 return 0;
2394
2395         if (opc == MDS_CLOSE)
2396                 close_req = true;
2397
2398         do {
2399                 spin_lock(&cli->cl_mod_rpcs_lock);
2400                 max = cli->cl_max_mod_rpcs_in_flight;
2401                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2402                         /* there is a slot available */
2403                         cli->cl_mod_rpcs_in_flight++;
2404                         if (close_req)
2405                                 cli->cl_close_rpcs_in_flight++;
2406                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2407                                          cli->cl_mod_rpcs_in_flight);
2408                         /* find a free tag */
2409                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2410                                                 max + 1);
2411                         LASSERT(i < OBD_MAX_RIF_MAX);
2412                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2413                         spin_unlock(&cli->cl_mod_rpcs_lock);
2414                         /* tag 0 is reserved for non-modify RPCs */
2415                         return i + 1;
2416                 }
2417                 spin_unlock(&cli->cl_mod_rpcs_lock);
2418
2419                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2420                        "opc %u, max %hu\n",
2421                        cli->cl_import->imp_obd->obd_name, opc, max);
2422
2423                 l_wait_event(cli->cl_mod_rpcs_waitq,
2424                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2425         } while (true);
2426 }
2427 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2428
2429 /* Put a modify RPC slot from the obd client @cli according
2430  * to the kind of operation @opc that has been sent and the
2431  * intent @it of the operation if it applies.
2432  */
2433 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2434                           struct lookup_intent *it, __u16 tag)
2435 {
2436         bool                    close_req = false;
2437
2438         if (obd_skip_mod_rpc_slot(it))
2439                 return;
2440
2441         if (opc == MDS_CLOSE)
2442                 close_req = true;
2443
2444         spin_lock(&cli->cl_mod_rpcs_lock);
2445         cli->cl_mod_rpcs_in_flight--;
2446         if (close_req)
2447                 cli->cl_close_rpcs_in_flight--;
2448         /* release the tag in the bitmap */
2449         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2450         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2451         spin_unlock(&cli->cl_mod_rpcs_lock);
2452         wake_up(&cli->cl_mod_rpcs_waitq);
2453 }
2454 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2455