Whamcloud - gitweb
LU-4976 osp: add comment to osp_dev.c
[fs/lustre-release.git] / lustre / osp / osp_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osp/osp_dev.c
37  *
38  * Lustre OST/MDT Proxy Device (OSP) is in the MDS stack, and used as
39  * a proxy device to communicate with other MDTs and OSTs.
40  *
41  * The purpose is to export the OSD API from the remote MDT or OST's underlying
42  * OSD for access and modification by the local service (typically MDD/LOD, or
43  * LFSCK) as if it were a local OSD. The goal is that the OSP provides a
44  * transparent interface for access to the remote OSD.
45  *
46  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
47  * Author: Mikhail Pershin <mike.pershin@intel.com>
48  * Author: Di Wang <di.wang@intel.com>
49  */
50
51 #define DEBUG_SUBSYSTEM S_MDS
52
53 #include <obd_class.h>
54 #include <lustre_ioctl.h>
55 #include <lustre_param.h>
56 #include <lustre_log.h>
57 #include <lustre_mdc.h>
58
59 #include "osp_internal.h"
60
61 /* Slab for OSP object allocation */
62 struct kmem_cache *osp_object_kmem;
63
64 static struct lu_kmem_descr osp_caches[] = {
65         {
66                 .ckd_cache = &osp_object_kmem,
67                 .ckd_name  = "osp_obj",
68                 .ckd_size  = sizeof(struct osp_object)
69         },
70         {
71                 .ckd_cache = NULL
72         }
73 };
74
75 /**
76  * Implementation of lu_device_operations::ldo_object_alloc
77  *
78  * Allocates an OSP object in memory, whose FID is on the remote target.
79  *
80  * \param[in] env       execution environment
81  * \param[in] hdr       The header of the object stack. If it is NULL, it
82  *                      means the object is not built from top device, i.e.
83  *                      it is a sub-stripe object of striped directory or
84  *                      an OST object.
85  * \param[in] d         OSP device
86  *
87  * \retval object       object being created if the creation succeed.
88  * \retval NULL         NULL if the creation failed.
89  */
90 struct lu_object *osp_object_alloc(const struct lu_env *env,
91                                    const struct lu_object_header *hdr,
92                                    struct lu_device *d)
93 {
94         struct lu_object_header *h = NULL;
95         struct osp_object       *o;
96         struct lu_object        *l;
97
98         OBD_SLAB_ALLOC_PTR_GFP(o, osp_object_kmem, GFP_NOFS);
99         if (o != NULL) {
100                 l = &o->opo_obj.do_lu;
101
102                 /* If hdr is NULL, it means the object is not built
103                  * from the top dev(MDT/OST), usually it happens when
104                  * building striped object, like data object on MDT or
105                  * striped object for directory */
106                 if (hdr == NULL) {
107                         h = &o->opo_header;
108                         lu_object_header_init(h);
109                         dt_object_init(&o->opo_obj, h, d);
110                         lu_object_add_top(h, l);
111                 } else {
112                         dt_object_init(&o->opo_obj, h, d);
113                 }
114
115                 l->lo_ops = &osp_lu_obj_ops;
116
117                 return l;
118         } else {
119                 return NULL;
120         }
121 }
122
123 /**
124  * Find or create the local object
125  *
126  * Finds or creates the local file referenced by \a reg_id and return the
127  * attributes of the local file.
128  *
129  * \param[in] env       execution environment
130  * \param[in] osp       OSP device
131  * \param[out] attr     attributes of the object
132  * \param[in] reg_id    the local object ID of the file. It will be used
133  *                      to compose a local FID{FID_SEQ_LOCAL_FILE, reg_id, 0}
134  *                      to identify the object.
135  *
136  * \retval object               object(dt_object) found or created
137  * \retval ERR_PTR(errno)       ERR_PTR(errno) if not get the object.
138  */
139 static struct dt_object
140 *osp_find_or_create_local_file(const struct lu_env *env, struct osp_device *osp,
141                                struct lu_attr *attr, __u32 reg_id)
142 {
143         struct osp_thread_info *osi = osp_env_info(env);
144         struct dt_object_format dof = { 0 };
145         struct dt_object       *dto;
146         int                  rc;
147         ENTRY;
148
149         lu_local_obj_fid(&osi->osi_fid, reg_id);
150         attr->la_valid = LA_MODE;
151         attr->la_mode = S_IFREG | 0644;
152         dof.dof_type = DFT_REGULAR;
153         /* Find or create the local object by osi_fid. */
154         dto = dt_find_or_create(env, osp->opd_storage, &osi->osi_fid,
155                                 &dof, attr);
156         if (IS_ERR(dto))
157                 RETURN(dto);
158
159         /* Get attributes of the local object. */
160         rc = dt_attr_get(env, dto, attr, NULL);
161         if (rc) {
162                 CERROR("%s: can't be initialized: rc = %d\n",
163                        osp->opd_obd->obd_name, rc);
164                 lu_object_put(env, &dto->do_lu);
165                 RETURN(ERR_PTR(rc));
166         }
167         RETURN(dto);
168 }
169
170 /**
171  * Write data buffer to a local file object.
172  *
173  * \param[in] env       execution environment
174  * \param[in] osp       OSP device
175  * \param[in] dt_obj    object written to
176  * \param[in] buf       buffer containing byte array and length
177  * \param[in] offset    write offset in the object in bytes
178  *
179  * \retval 0            0 if write succeed
180  * \retval -EFAULT      -EFAULT if only part of buffer is written.
181  * \retval negative             other negative errno if write failed.
182  */
183 static int osp_write_local_file(const struct lu_env *env,
184                                 struct osp_device *osp,
185                                 struct dt_object *dt_obj,
186                                 struct lu_buf *buf,
187                                 loff_t offset)
188 {
189         struct thandle *th;
190         int rc;
191
192         th = dt_trans_create(env, osp->opd_storage);
193         if (IS_ERR(th))
194                 RETURN(PTR_ERR(th));
195
196         rc = dt_declare_record_write(env, dt_obj, buf, offset, th);
197         if (rc)
198                 GOTO(out, rc);
199         rc = dt_trans_start_local(env, osp->opd_storage, th);
200         if (rc)
201                 GOTO(out, rc);
202
203         rc = dt_record_write(env, dt_obj, buf, &offset, th);
204 out:
205         dt_trans_stop(env, osp->opd_storage, th);
206         RETURN(rc);
207 }
208
209 /**
210  * Initialize last ID object.
211  *
212  * This function initializes the LAST_ID file, which stores the current last
213  * used id of data objects. The MDT will use the last used id and the last_seq
214  * (\see osp_init_last_seq()) to synchronize the precreate object cache with
215  * OSTs.
216  *
217  * \param[in] env       execution environment
218  * \param[in] osp       OSP device
219  *
220  * \retval 0            0 if initialization succeed
221  * \retval negative     negative errno if initialization failed
222  */
223 static int osp_init_last_objid(const struct lu_env *env, struct osp_device *osp)
224 {
225         struct osp_thread_info  *osi = osp_env_info(env);
226         struct lu_fid           *fid = &osp->opd_last_used_fid;
227         struct dt_object        *dto;
228         int                     rc;
229         ENTRY;
230
231         dto = osp_find_or_create_local_file(env, osp, &osi->osi_attr,
232                                             MDD_LOV_OBJ_OID);
233         if (IS_ERR(dto))
234                 RETURN(PTR_ERR(dto));
235
236         /* object will be released in device cleanup path */
237         if (osi->osi_attr.la_size >=
238             sizeof(osi->osi_id) * (osp->opd_index + 1)) {
239                 osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_oid,
240                                    osp->opd_index);
241                 rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off);
242                 if (rc != 0)
243                         GOTO(out, rc);
244         } else {
245                 fid->f_oid = 0;
246                 osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_oid,
247                                    osp->opd_index);
248                 rc = osp_write_local_file(env, osp, dto, &osi->osi_lb,
249                                           osi->osi_off);
250                 if (rc != 0)
251                         GOTO(out, rc);
252         }
253         osp->opd_last_used_oid_file = dto;
254         RETURN(0);
255 out:
256         /* object will be released in device cleanup path */
257         CERROR("%s: can't initialize lov_objid: rc = %d\n",
258                osp->opd_obd->obd_name, rc);
259         lu_object_put(env, &dto->do_lu);
260         osp->opd_last_used_oid_file = NULL;
261         RETURN(rc);
262 }
263
264 /**
265  * Initialize last sequence object.
266  *
267  * This function initializes the LAST_SEQ file in the local OSD, which stores
268  * the current last used sequence of data objects. The MDT will use the last
269  * sequence and last id (\see osp_init_last_objid()) to synchronize the
270  * precreate object cache with OSTs.
271  *
272  * \param[in] env       execution environment
273  * \param[in] osp       OSP device
274  *
275  * \retval 0            0 if initialization succeed
276  * \retval negative     negative errno if initialization failed
277  */
278 static int osp_init_last_seq(const struct lu_env *env, struct osp_device *osp)
279 {
280         struct osp_thread_info  *osi = osp_env_info(env);
281         struct lu_fid           *fid = &osp->opd_last_used_fid;
282         struct dt_object        *dto;
283         int                     rc;
284         ENTRY;
285
286         dto = osp_find_or_create_local_file(env, osp, &osi->osi_attr,
287                                             MDD_LOV_OBJ_OSEQ);
288         if (IS_ERR(dto))
289                 RETURN(PTR_ERR(dto));
290
291         /* object will be released in device cleanup path */
292         if (osi->osi_attr.la_size >=
293             sizeof(osi->osi_id) * (osp->opd_index + 1)) {
294                 osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_seq,
295                                    osp->opd_index);
296                 rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off);
297                 if (rc != 0)
298                         GOTO(out, rc);
299         } else {
300                 fid->f_seq = 0;
301                 osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_seq,
302                                     osp->opd_index);
303                 rc = osp_write_local_file(env, osp, dto, &osi->osi_lb,
304                                           osi->osi_off);
305         }
306         osp->opd_last_used_seq_file = dto;
307         RETURN(0);
308 out:
309         /* object will be released in device cleanup path */
310         CERROR("%s: can't initialize lov_seq: rc = %d\n",
311                osp->opd_obd->obd_name, rc);
312         lu_object_put(env, &dto->do_lu);
313         osp->opd_last_used_seq_file = NULL;
314         RETURN(rc);
315 }
316
317 /**
318  * Initialize last OID and sequence object.
319  *
320  * If the MDT is just upgraded to 2.4 from the lower version, where the
321  * LAST_SEQ file does not exist, the file will be created and IDIF sequence
322  * will be written into the file.
323  *
324  * \param[in] env       execution environment
325  * \param[in] osp       OSP device
326  *
327  * \retval 0            0 if initialization succeed
328  * \retval negative     negative error if initialization failed
329  */
330 static int osp_last_used_init(const struct lu_env *env, struct osp_device *osp)
331 {
332         struct osp_thread_info *osi = osp_env_info(env);
333         int                  rc;
334         ENTRY;
335
336         fid_zero(&osp->opd_last_used_fid);
337         rc = osp_init_last_objid(env, osp);
338         if (rc < 0) {
339                 CERROR("%s: Can not get ids %d from old objid!\n",
340                        osp->opd_obd->obd_name, rc);
341                 RETURN(rc);
342         }
343
344         rc = osp_init_last_seq(env, osp);
345         if (rc < 0) {
346                 CERROR("%s: Can not get ids %d from old objid!\n",
347                        osp->opd_obd->obd_name, rc);
348                 GOTO(out, rc);
349         }
350
351         if (fid_oid(&osp->opd_last_used_fid) != 0 &&
352             fid_seq(&osp->opd_last_used_fid) == 0) {
353                 /* Just upgrade from the old version,
354                  * set the seq to be IDIF */
355                 osp->opd_last_used_fid.f_seq =
356                    fid_idif_seq(fid_oid(&osp->opd_last_used_fid),
357                                 osp->opd_index);
358                 osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off,
359                                     &osp->opd_last_used_fid.f_seq,
360                                     osp->opd_index);
361                 rc = osp_write_local_file(env, osp, osp->opd_last_used_seq_file,
362                                           &osi->osi_lb, osi->osi_off);
363                 if (rc) {
364                         CERROR("%s : Can not write seq file: rc = %d\n",
365                                osp->opd_obd->obd_name, rc);
366                         GOTO(out, rc);
367                 }
368         }
369
370         if (!fid_is_zero(&osp->opd_last_used_fid) &&
371                  !fid_is_sane(&osp->opd_last_used_fid)) {
372                 CERROR("%s: Got invalid FID "DFID"\n", osp->opd_obd->obd_name,
373                         PFID(&osp->opd_last_used_fid));
374                 GOTO(out, rc = -EINVAL);
375         }
376
377         CDEBUG(D_INFO, "%s: Init last used fid "DFID"\n",
378                osp->opd_obd->obd_name, PFID(&osp->opd_last_used_fid));
379 out:
380         if (rc != 0) {
381                 if (osp->opd_last_used_oid_file != NULL) {
382                         lu_object_put(env, &osp->opd_last_used_oid_file->do_lu);
383                         osp->opd_last_used_oid_file = NULL;
384                 }
385                 if (osp->opd_last_used_seq_file != NULL) {
386                         lu_object_put(env, &osp->opd_last_used_seq_file->do_lu);
387                         osp->opd_last_used_seq_file = NULL;
388                 }
389         }
390
391         RETURN(rc);
392 }
393
394 /**
395  * Release the last sequence and OID file objects in OSP device.
396  *
397  * \param[in] env       execution environment
398  * \param[in] osp       OSP device
399  */
400 static void osp_last_used_fini(const struct lu_env *env, struct osp_device *osp)
401 {
402         /* release last_used file */
403         if (osp->opd_last_used_oid_file != NULL) {
404                 lu_object_put(env, &osp->opd_last_used_oid_file->do_lu);
405                 osp->opd_last_used_oid_file = NULL;
406         }
407
408         if (osp->opd_last_used_seq_file != NULL) {
409                 lu_object_put(env, &osp->opd_last_used_seq_file->do_lu);
410                 osp->opd_last_used_seq_file = NULL;
411         }
412 }
413
414 /**
415  * Disconnects the connection between OSP and its correspondent MDT or OST, and
416  * the import will be marked as inactive. It will only be called during OSP
417  * cleanup process.
418  *
419  * \param[in] d         OSP device being disconnected
420  *
421  * \retval 0            0 if disconnection succeed
422  * \retval negative     negative errno if disconnection failed
423  */
424 static int osp_disconnect(struct osp_device *d)
425 {
426         struct obd_import *imp;
427         int rc = 0;
428
429         imp = d->opd_obd->u.cli.cl_import;
430
431         /* Mark import deactivated now, so we don't try to reconnect if any
432          * of the cleanup RPCs fails (e.g. ldlm cancel, etc).  We don't
433          * fully deactivate the import, or that would drop all requests. */
434         LASSERT(imp != NULL);
435         spin_lock(&imp->imp_lock);
436         imp->imp_deactive = 1;
437         spin_unlock(&imp->imp_lock);
438
439         ptlrpc_deactivate_import(imp);
440
441         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
442          * delete it regardless.  (It's safe to delete an import that was
443          * never added.) */
444         (void)ptlrpc_pinger_del_import(imp);
445
446         rc = ptlrpc_disconnect_import(imp, 0);
447         if (rc != 0)
448                 CERROR("%s: can't disconnect: rc = %d\n",
449                        d->opd_obd->obd_name, rc);
450
451         ptlrpc_invalidate_import(imp);
452
453         RETURN(rc);
454 }
455
456 /**
457  * Cleanup OSP, which includes disconnect import, cleanup unlink log, stop
458  * precreate threads etc.
459  *
460  * \param[in] env       execution environment.
461  * \param[in] d         OSP device being disconnected.
462  *
463  * \retval 0            0 if cleanup succeed
464  * \retval negative     negative errno if cleanup failed
465  */
466 static int osp_shutdown(const struct lu_env *env, struct osp_device *d)
467 {
468         int                      rc = 0;
469         ENTRY;
470
471         LASSERT(env);
472
473         rc = osp_disconnect(d);
474
475         osp_sync_fini(d);
476
477         if (!d->opd_connect_mdt) {
478                 /* stop precreate thread */
479                 osp_precreate_fini(d);
480
481                 /* release last_used file */
482                 osp_last_used_fini(env, d);
483         }
484
485         obd_fid_fini(d->opd_obd);
486
487         RETURN(rc);
488 }
489
490 /**
491  * Implementation of osp_lu_ops::ldo_process_config
492  *
493  * This function processes config log records in OSP layer. It is usually
494  * called from the top layer of MDT stack, and goes through the stack by calling
495  * ldo_process_config of next layer.
496  *
497  * \param[in] env       execution environment
498  * \param[in] dev       lu_device of OSP
499  * \param[in] lcfg      config log
500  *
501  * \retval 0            0 if the config log record is executed correctly.
502  * \retval negative     negative errno if the record execution fails.
503  */
504 static int osp_process_config(const struct lu_env *env,
505                               struct lu_device *dev, struct lustre_cfg *lcfg)
506 {
507         struct osp_device               *d = lu2osp_dev(dev);
508         struct obd_device               *obd = d->opd_obd;
509         int                              rc;
510
511         ENTRY;
512
513         switch (lcfg->lcfg_command) {
514         case LCFG_PRE_CLEANUP:
515                 rc = osp_disconnect(d);
516                 break;
517         case LCFG_CLEANUP:
518                 lu_dev_del_linkage(dev->ld_site, dev);
519                 rc = osp_shutdown(env, d);
520                 break;
521         case LCFG_PARAM:
522                 LASSERT(obd);
523                 rc = class_process_proc_seq_param(PARAM_OSC, obd->obd_vars,
524                                                   lcfg, obd);
525                 if (rc > 0)
526                         rc = 0;
527                 if (rc == -ENOSYS) {
528                         /* class_process_proc_param() haven't found matching
529                          * parameter and returned ENOSYS so that layer(s)
530                          * below could use that. But OSP is the bottom, so
531                          * just ignore it */
532                         CERROR("%s: unknown param %s\n",
533                                (char *)lustre_cfg_string(lcfg, 0),
534                                (char *)lustre_cfg_string(lcfg, 1));
535                         rc = 0;
536                 }
537                 break;
538         default:
539                 CERROR("%s: unknown command %u\n",
540                        (char *)lustre_cfg_string(lcfg, 0), lcfg->lcfg_command);
541                 rc = 0;
542                 break;
543         }
544
545         RETURN(rc);
546 }
547
548 /**
549  * Implementation of osp_lu_ops::ldo_recovery_complete
550  *
551  * This function is called after recovery is finished, and OSP layer
552  * will wake up precreate thread here.
553  *
554  * \param[in] env       execution environment
555  * \param[in] dev       lu_device of OSP
556  *
557  * \retval 0            0 unconditionally
558  */
559 static int osp_recovery_complete(const struct lu_env *env,
560                                  struct lu_device *dev)
561 {
562         struct osp_device       *osp = lu2osp_dev(dev);
563
564         ENTRY;
565         osp->opd_recovery_completed = 1;
566
567         if (!osp->opd_connect_mdt && osp->opd_pre != NULL)
568                 wake_up(&osp->opd_pre_waitq);
569
570         RETURN(0);
571 }
572
573 const struct lu_device_operations osp_lu_ops = {
574         .ldo_object_alloc       = osp_object_alloc,
575         .ldo_process_config     = osp_process_config,
576         .ldo_recovery_complete  = osp_recovery_complete,
577 };
578
579 /**
580  * Implementation of dt_device_operations::dt_statfs
581  *
582  * This function provides statfs status (for precreation) from
583  * corresponding OST. Note: this function only retrieves the status
584  * from the OSP device, and the real statfs RPC happens inside
585  * precreate thread (\see osp_statfs_update). Note: OSP for MDT does
586  * not need to retrieve statfs data for now.
587  *
588  * \param[in] env       execution environment.
589  * \param[in] dev       dt_device of OSP.
590  * \param[out] sfs      holds the retrieved statfs data.
591  *
592  * \retval 0            0 statfs data was retrieved successfully or
593  *                      retrieval was not needed
594  * \retval negative     negative errno if get statfs failed.
595  */
596 static int osp_statfs(const struct lu_env *env, struct dt_device *dev,
597                       struct obd_statfs *sfs)
598 {
599         struct osp_device *d = dt2osp_dev(dev);
600
601         ENTRY;
602
603         if (unlikely(d->opd_imp_active == 0))
604                 RETURN(-ENOTCONN);
605
606         if (d->opd_pre == NULL)
607                 RETURN(0);
608
609         /* return recently updated data */
610         *sfs = d->opd_statfs;
611
612         /*
613          * layer above osp (usually lod) can use ffree to estimate
614          * how many objects are available for immediate creation
615          */
616         spin_lock(&d->opd_pre_lock);
617         LASSERTF(fid_seq(&d->opd_pre_last_created_fid) ==
618                  fid_seq(&d->opd_pre_used_fid),
619                  "last_created "DFID", next_fid "DFID"\n",
620                  PFID(&d->opd_pre_last_created_fid),
621                  PFID(&d->opd_pre_used_fid));
622         sfs->os_fprecreated = fid_oid(&d->opd_pre_last_created_fid) -
623                               fid_oid(&d->opd_pre_used_fid);
624         sfs->os_fprecreated -= d->opd_pre_reserved;
625         spin_unlock(&d->opd_pre_lock);
626
627         LASSERT(sfs->os_fprecreated <= OST_MAX_PRECREATE * 2);
628
629         CDEBUG(D_OTHER, "%s: "LPU64" blocks, "LPU64" free, "LPU64" avail, "
630                LPU64" files, "LPU64" free files\n", d->opd_obd->obd_name,
631                sfs->os_blocks, sfs->os_bfree, sfs->os_bavail,
632                sfs->os_files, sfs->os_ffree);
633         RETURN(0);
634 }
635
636 static int osp_sync_timeout(void *data)
637 {
638         return 1;
639 }
640
641 /**
642  * Implementation of dt_device_operations::dt_sync
643  *
644  * This function synchronizes the OSP cache to the remote target. It wakes
645  * up unlink log threads and sends out unlink records to the remote OST.
646  *
647  * \param[in] env       execution environment
648  * \param[in] dev       dt_device of OSP
649  *
650  * \retval 0            0 if synchronization succeeds
651  * \retval negative     negative errno if synchronization fails
652  */
653 static int osp_sync(const struct lu_env *env, struct dt_device *dev)
654 {
655         struct osp_device *d = dt2osp_dev(dev);
656         cfs_time_t         expire;
657         struct l_wait_info lwi = { 0 };
658         unsigned long      id, old;
659         int                rc = 0;
660         unsigned long      start = cfs_time_current();
661         ENTRY;
662
663         if (unlikely(d->opd_imp_active == 0))
664                 RETURN(-ENOTCONN);
665
666         id = d->opd_syn_last_used_id;
667
668         CDEBUG(D_OTHER, "%s: id: used %lu, processed %lu\n",
669                d->opd_obd->obd_name, id, d->opd_syn_last_processed_id);
670
671         /* wait till all-in-line are processed */
672         while (d->opd_syn_last_processed_id < id) {
673
674                 old = d->opd_syn_last_processed_id;
675
676                 /* make sure the connection is fine */
677                 expire = cfs_time_shift(obd_timeout);
678                 lwi = LWI_TIMEOUT(expire - cfs_time_current(),
679                                   osp_sync_timeout, d);
680                 l_wait_event(d->opd_syn_barrier_waitq,
681                              d->opd_syn_last_processed_id >= id,
682                              &lwi);
683
684                 if (d->opd_syn_last_processed_id >= id)
685                         break;
686
687                 if (d->opd_syn_last_processed_id != old) {
688                         /* some progress have been made,
689                          * keep trying... */
690                         continue;
691                 }
692
693                 /* no changes and expired, something is wrong */
694                 GOTO(out, rc = -ETIMEDOUT);
695         }
696
697         /* block new processing (barrier>0 - few callers are possible */
698         atomic_inc(&d->opd_syn_barrier);
699
700         CDEBUG(D_OTHER, "%s: %u in flight\n", d->opd_obd->obd_name,
701                d->opd_syn_rpc_in_flight);
702
703         /* wait till all-in-flight are replied, so executed by the target */
704         /* XXX: this is used by LFSCK at the moment, which doesn't require
705          *      all the changes to be committed, but in general it'd be
706          *      better to wait till commit */
707         while (d->opd_syn_rpc_in_flight > 0) {
708
709                 old = d->opd_syn_rpc_in_flight;
710
711                 expire = cfs_time_shift(obd_timeout);
712                 lwi = LWI_TIMEOUT(expire - cfs_time_current(),
713                                   osp_sync_timeout, d);
714                 l_wait_event(d->opd_syn_barrier_waitq,
715                                 d->opd_syn_rpc_in_flight == 0, &lwi);
716
717                 if (d->opd_syn_rpc_in_flight == 0)
718                         break;
719
720                 if (d->opd_syn_rpc_in_flight != old) {
721                         /* some progress have been made */
722                         continue;
723                 }
724
725                 /* no changes and expired, something is wrong */
726                 GOTO(out, rc = -ETIMEDOUT);
727         }
728
729         CDEBUG(D_OTHER, "%s: done in %lu\n", d->opd_obd->obd_name,
730                cfs_time_current() - start);
731 out:
732         /* resume normal processing (barrier=0) */
733         atomic_dec(&d->opd_syn_barrier);
734         __osp_sync_check_for_work(d);
735
736         RETURN(rc);
737 }
738
739 const struct dt_device_operations osp_dt_ops = {
740         .dt_statfs       = osp_statfs,
741         .dt_sync         = osp_sync,
742         .dt_trans_create = osp_trans_create,
743         .dt_trans_start  = osp_trans_start,
744         .dt_trans_stop   = osp_trans_stop,
745 };
746
747 /**
748  * Connect OSP to local OSD.
749  *
750  * Locate the local OSD referenced by \a nextdev and connect to it. Sometimes,
751  * OSP needs to access the local OSD to store some information. For example,
752  * during precreate, it needs to update last used OID and sequence file
753  * (LAST_SEQ) in local OSD.
754  *
755  * \param[in] env       execution environment
756  * \param[in] osp       OSP device
757  * \param[in] nextdev   the name of local OSD
758  *
759  * \retval 0            0 connection succeeded
760  * \retval negative     negative errno connection failed
761  */
762 static int osp_connect_to_osd(const struct lu_env *env, struct osp_device *osp,
763                               const char *nextdev)
764 {
765         struct obd_connect_data *data = NULL;
766         struct obd_device       *obd;
767         int                      rc;
768
769         ENTRY;
770
771         LASSERT(osp->opd_storage_exp == NULL);
772
773         OBD_ALLOC_PTR(data);
774         if (data == NULL)
775                 RETURN(-ENOMEM);
776
777         obd = class_name2obd(nextdev);
778         if (obd == NULL) {
779                 CERROR("%s: can't locate next device: %s\n",
780                        osp->opd_obd->obd_name, nextdev);
781                 GOTO(out, rc = -ENOTCONN);
782         }
783
784         rc = obd_connect(env, &osp->opd_storage_exp, obd, &obd->obd_uuid, data,
785                          NULL);
786         if (rc) {
787                 CERROR("%s: cannot connect to next dev %s: rc = %d\n",
788                        osp->opd_obd->obd_name, nextdev, rc);
789                 GOTO(out, rc);
790         }
791
792         osp->opd_dt_dev.dd_lu_dev.ld_site =
793                 osp->opd_storage_exp->exp_obd->obd_lu_dev->ld_site;
794         LASSERT(osp->opd_dt_dev.dd_lu_dev.ld_site);
795         osp->opd_storage = lu2dt_dev(osp->opd_storage_exp->exp_obd->obd_lu_dev);
796
797 out:
798         OBD_FREE_PTR(data);
799         RETURN(rc);
800 }
801
802 /**
803  * Initialize OSP device according to the parameters in the configuration
804  * log \a cfg.
805  *
806  * Reconstruct the local device name from the configuration profile, and
807  * initialize necessary threads and structures according to the OSP type
808  * (MDT or OST).
809  *
810  * Since there is no record in the MDT configuration for the local disk
811  * device, we have to extract this from elsewhere in the profile.
812  * The only information we get at setup is from the OSC records:
813  * setup 0:{fsname}-OSTxxxx-osc[-MDTxxxx] 1:lustre-OST0000_UUID 2:NID
814  *
815  * Note: configs generated by Lustre 1.8 are missing the -MDTxxxx part,
816  * so, we need to reconstruct the name of the underlying OSD from this:
817  * {fsname}-{svname}-osd, for example "lustre-MDT0000-osd".
818  *
819  * \param[in] env       execution environment
820  * \param[in] osp       OSP device
821  * \param[in] ldt       lu device type of OSP
822  * \param[in] cfg       configuration log
823  *
824  * \retval 0            0 if OSP initialization succeeded.
825  * \retval negative     negative errno if OSP initialization failed.
826  */
827 static int osp_init0(const struct lu_env *env, struct osp_device *osp,
828                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
829 {
830         struct obd_device       *obd;
831         struct obd_import       *imp;
832         class_uuid_t            uuid;
833         char                    *src, *tgt, *mdt, *osdname = NULL;
834         int                     rc;
835         long                    idx;
836
837         ENTRY;
838
839         mutex_init(&osp->opd_async_requests_mutex);
840
841         obd = class_name2obd(lustre_cfg_string(cfg, 0));
842         if (obd == NULL) {
843                 CERROR("Cannot find obd with name %s\n",
844                        lustre_cfg_string(cfg, 0));
845                 RETURN(-ENODEV);
846         }
847         osp->opd_obd = obd;
848
849         src = lustre_cfg_string(cfg, 0);
850         if (src == NULL)
851                 RETURN(-EINVAL);
852
853         tgt = strrchr(src, '-');
854         if (tgt == NULL) {
855                 CERROR("%s: invalid target name %s: rc = %d\n",
856                        osp->opd_obd->obd_name, lustre_cfg_string(cfg, 0),
857                        -EINVAL);
858                 RETURN(-EINVAL);
859         }
860
861         if (strncmp(tgt, "-osc", 4) == 0) {
862                 /* Old OSC name fsname-OSTXXXX-osc */
863                 for (tgt--; tgt > src && *tgt != '-'; tgt--)
864                         ;
865                 if (tgt == src) {
866                         CERROR("%s: invalid target name %s: rc = %d\n",
867                                osp->opd_obd->obd_name,
868                                lustre_cfg_string(cfg, 0), -EINVAL);
869                         RETURN(-EINVAL);
870                 }
871
872                 if (strncmp(tgt, "-OST", 4) != 0) {
873                         CERROR("%s: invalid target name %s: rc = %d\n",
874                                osp->opd_obd->obd_name,
875                                lustre_cfg_string(cfg, 0), -EINVAL);
876                         RETURN(-EINVAL);
877                 }
878
879                 idx = simple_strtol(tgt + 4, &mdt, 16);
880                 if (mdt[0] != '-' || idx > INT_MAX || idx < 0) {
881                         CERROR("%s: invalid OST index in '%s': rc = %d\n",
882                                osp->opd_obd->obd_name, src, -EINVAL);
883                         RETURN(-EINVAL);
884                 }
885                 osp->opd_index = idx;
886                 osp->opd_group = 0;
887                 idx = tgt - src;
888         } else {
889                 /* New OSC name fsname-OSTXXXX-osc-MDTXXXX */
890                 if (strncmp(tgt, "-MDT", 4) != 0 &&
891                     strncmp(tgt, "-OST", 4) != 0) {
892                         CERROR("%s: invalid target name %s: rc = %d\n",
893                                osp->opd_obd->obd_name,
894                                lustre_cfg_string(cfg, 0), -EINVAL);
895                         RETURN(-EINVAL);
896                 }
897
898                 idx = simple_strtol(tgt + 4, &mdt, 16);
899                 if (*mdt != '\0' || idx > INT_MAX || idx < 0) {
900                         CERROR("%s: invalid OST index in '%s': rc = %d\n",
901                                osp->opd_obd->obd_name, src, -EINVAL);
902                         RETURN(-EINVAL);
903                 }
904
905                 /* Get MDT index from the name and set it to opd_group,
906                  * which will be used by OSP to connect with OST */
907                 osp->opd_group = idx;
908                 if (tgt - src <= 12) {
909                         CERROR("%s: invalid mdt index from %s: rc =%d\n",
910                                osp->opd_obd->obd_name,
911                                lustre_cfg_string(cfg, 0), -EINVAL);
912                         RETURN(-EINVAL);
913                 }
914
915                 if (strncmp(tgt - 12, "-MDT", 4) == 0)
916                         osp->opd_connect_mdt = 1;
917
918                 idx = simple_strtol(tgt - 8, &mdt, 16);
919                 if (mdt[0] != '-' || idx > INT_MAX || idx < 0) {
920                         CERROR("%s: invalid OST index in '%s': rc =%d\n",
921                                osp->opd_obd->obd_name, src, -EINVAL);
922                         RETURN(-EINVAL);
923                 }
924
925                 osp->opd_index = idx;
926                 idx = tgt - src - 12;
927         }
928         /* check the fsname length, and after this everything else will fit */
929         if (idx > MTI_NAME_MAXLEN) {
930                 CERROR("%s: fsname too long in '%s': rc = %d\n",
931                        osp->opd_obd->obd_name, src, -EINVAL);
932                 RETURN(-EINVAL);
933         }
934
935         OBD_ALLOC(osdname, MAX_OBD_NAME);
936         if (osdname == NULL)
937                 RETURN(-ENOMEM);
938
939         memcpy(osdname, src, idx); /* copy just the fsname part */
940         osdname[idx] = '\0';
941
942         mdt = strstr(mdt, "-MDT");
943         if (mdt == NULL) /* 1.8 configs don't have "-MDT0000" at the end */
944                 strcat(osdname, "-MDT0000");
945         else
946                 strcat(osdname, mdt);
947         strcat(osdname, "-osd");
948         CDEBUG(D_HA, "%s: connect to %s (%s)\n", obd->obd_name, osdname, src);
949
950         if (osp->opd_connect_mdt) {
951                 struct client_obd *cli = &osp->opd_obd->u.cli;
952
953                 OBD_ALLOC(cli->cl_rpc_lock, sizeof(*cli->cl_rpc_lock));
954                 if (!cli->cl_rpc_lock)
955                         GOTO(out_fini, rc = -ENOMEM);
956                 osp_init_rpc_lock(cli->cl_rpc_lock);
957         }
958
959         osp->opd_dt_dev.dd_lu_dev.ld_ops = &osp_lu_ops;
960         osp->opd_dt_dev.dd_ops = &osp_dt_ops;
961
962         obd->obd_lu_dev = &osp->opd_dt_dev.dd_lu_dev;
963
964         rc = osp_connect_to_osd(env, osp, osdname);
965         if (rc)
966                 GOTO(out_fini, rc);
967
968         rc = ptlrpcd_addref();
969         if (rc)
970                 GOTO(out_disconnect, rc);
971
972         rc = client_obd_setup(obd, cfg);
973         if (rc) {
974                 CERROR("%s: can't setup obd: rc = %d\n", osp->opd_obd->obd_name,
975                        rc);
976                 GOTO(out_ref, rc);
977         }
978
979         osp_lprocfs_init(osp);
980
981         rc = obd_fid_init(osp->opd_obd, NULL, osp->opd_connect_mdt ?
982                           LUSTRE_SEQ_METADATA : LUSTRE_SEQ_DATA);
983         if (rc) {
984                 CERROR("%s: fid init error: rc = %d\n",
985                        osp->opd_obd->obd_name, rc);
986                 GOTO(out_proc, rc);
987         }
988
989         if (!osp->opd_connect_mdt) {
990                 /* Initialize last id from the storage - will be
991                  * used in orphan cleanup. */
992                 rc = osp_last_used_init(env, osp);
993                 if (rc)
994                         GOTO(out_proc, rc);
995
996
997                 /* Initialize precreation thread, it handles new
998                  * connections as well. */
999                 rc = osp_init_precreate(osp);
1000                 if (rc)
1001                         GOTO(out_last_used, rc);
1002         }
1003
1004         /*
1005          * Initialize synhronization mechanism taking
1006          * care of propogating changes to OST in near
1007          * transactional manner.
1008          */
1009         rc = osp_sync_init(env, osp);
1010         if (rc)
1011                 GOTO(out_precreat, rc);
1012
1013         /*
1014          * Initiate connect to OST
1015          */
1016         ll_generate_random_uuid(uuid);
1017         class_uuid_unparse(uuid, &osp->opd_cluuid);
1018
1019         imp = obd->u.cli.cl_import;
1020
1021         rc = ptlrpc_init_import(imp);
1022         if (rc)
1023                 GOTO(out, rc);
1024         if (osdname)
1025                 OBD_FREE(osdname, MAX_OBD_NAME);
1026         RETURN(0);
1027
1028 out:
1029         /* stop sync thread */
1030         osp_sync_fini(osp);
1031 out_precreat:
1032         /* stop precreate thread */
1033         if (!osp->opd_connect_mdt)
1034                 osp_precreate_fini(osp);
1035 out_last_used:
1036         if (!osp->opd_connect_mdt)
1037                 osp_last_used_fini(env, osp);
1038 out_proc:
1039         ptlrpc_lprocfs_unregister_obd(obd);
1040         lprocfs_obd_cleanup(obd);
1041         if (osp->opd_symlink)
1042                 lprocfs_remove(&osp->opd_symlink);
1043         client_obd_cleanup(obd);
1044 out_ref:
1045         ptlrpcd_decref();
1046 out_disconnect:
1047         if (osp->opd_connect_mdt) {
1048                 struct client_obd *cli = &osp->opd_obd->u.cli;
1049                 if (cli->cl_rpc_lock != NULL) {
1050                         OBD_FREE_PTR(cli->cl_rpc_lock);
1051                         cli->cl_rpc_lock = NULL;
1052                 }
1053         }
1054         obd_disconnect(osp->opd_storage_exp);
1055 out_fini:
1056         if (osdname)
1057                 OBD_FREE(osdname, MAX_OBD_NAME);
1058         RETURN(rc);
1059 }
1060
1061 /**
1062  * Implementation of lu_device_type_operations::ldto_device_free
1063  *
1064  * Free the OSP device in memory.  No return value is needed for now,
1065  * so always return NULL to comply with the interface.
1066  *
1067  * \param[in] env       execution environment
1068  * \param[in] lu        lu_device of OSP
1069  *
1070  * \retval NULL         NULL unconditionally
1071  */
1072 static struct lu_device *osp_device_free(const struct lu_env *env,
1073                                          struct lu_device *lu)
1074 {
1075         struct osp_device *osp = lu2osp_dev(lu);
1076
1077         if (atomic_read(&lu->ld_ref) && lu->ld_site) {
1078                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
1079                 lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer);
1080         }
1081         dt_device_fini(&osp->opd_dt_dev);
1082         OBD_FREE_PTR(osp);
1083
1084         return NULL;
1085 }
1086
1087 /**
1088  * Implementation of lu_device_type_operations::ldto_device_alloc
1089  *
1090  * This function allocates and initializes OSP device in memory according to
1091  * the config log.
1092  *
1093  * \param[in] env       execution environment
1094  * \param[in] type      device type of OSP
1095  * \param[in] lcfg      config log
1096  *
1097  * \retval pointer              the pointer of allocated OSP if succeed.
1098  * \retval ERR_PTR(errno)       ERR_PTR(errno) if failed.
1099  */
1100 static struct lu_device *osp_device_alloc(const struct lu_env *env,
1101                                           struct lu_device_type *type,
1102                                           struct lustre_cfg *lcfg)
1103 {
1104         struct osp_device *osp;
1105         struct lu_device  *ld;
1106
1107         OBD_ALLOC_PTR(osp);
1108         if (osp == NULL) {
1109                 ld = ERR_PTR(-ENOMEM);
1110         } else {
1111                 int rc;
1112
1113                 ld = osp2lu_dev(osp);
1114                 dt_device_init(&osp->opd_dt_dev, type);
1115                 rc = osp_init0(env, osp, type, lcfg);
1116                 if (rc != 0) {
1117                         osp_device_free(env, ld);
1118                         ld = ERR_PTR(rc);
1119                 }
1120         }
1121         return ld;
1122 }
1123
1124 /**
1125  * Implementation of lu_device_type_operations::ldto_device_fini
1126  *
1127  * This function cleans up the OSP device, i.e. release and free those
1128  * attached items in osp_device.
1129  *
1130  * \param[in] env       execution environment
1131  * \param[in] ld        lu_device of OSP
1132  *
1133  * \retval NULL                 NULL if cleanup succeeded.
1134  * \retval ERR_PTR(errno)       ERR_PTR(errno) if cleanup failed.
1135  */
1136 static struct lu_device *osp_device_fini(const struct lu_env *env,
1137                                          struct lu_device *ld)
1138 {
1139         struct osp_device *osp = lu2osp_dev(ld);
1140         struct obd_import *imp;
1141         int                rc;
1142
1143         ENTRY;
1144
1145         if (osp->opd_async_requests != NULL) {
1146                 out_destroy_update_req(osp->opd_async_requests);
1147                 osp->opd_async_requests = NULL;
1148         }
1149
1150         if (osp->opd_storage_exp)
1151                 obd_disconnect(osp->opd_storage_exp);
1152
1153         imp = osp->opd_obd->u.cli.cl_import;
1154
1155         if (imp->imp_rq_pool) {
1156                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
1157                 imp->imp_rq_pool = NULL;
1158         }
1159
1160         if (osp->opd_symlink)
1161                 lprocfs_remove(&osp->opd_symlink);
1162
1163         LASSERT(osp->opd_obd);
1164         ptlrpc_lprocfs_unregister_obd(osp->opd_obd);
1165         lprocfs_obd_cleanup(osp->opd_obd);
1166
1167         if (osp->opd_connect_mdt) {
1168                 struct client_obd *cli = &osp->opd_obd->u.cli;
1169                 if (cli->cl_rpc_lock != NULL) {
1170                         OBD_FREE_PTR(cli->cl_rpc_lock);
1171                         cli->cl_rpc_lock = NULL;
1172                 }
1173         }
1174
1175         rc = client_obd_cleanup(osp->opd_obd);
1176         if (rc != 0) {
1177                 ptlrpcd_decref();
1178                 RETURN(ERR_PTR(rc));
1179         }
1180
1181         ptlrpcd_decref();
1182
1183         RETURN(NULL);
1184 }
1185
1186 /**
1187  * Implementation of obd_ops::o_reconnect
1188  *
1189  * This function is empty and does not need to do anything for now.
1190  */
1191 static int osp_reconnect(const struct lu_env *env,
1192                          struct obd_export *exp, struct obd_device *obd,
1193                          struct obd_uuid *cluuid,
1194                          struct obd_connect_data *data,
1195                          void *localdata)
1196 {
1197         return 0;
1198 }
1199
1200 /*
1201  * Implementation of obd_ops::o_connect
1202  *
1203  * Connect OSP to the remote target (MDT or OST). Allocate the
1204  * export and return it to the LOD, which calls this function
1205  * for each OSP to connect it to the remote target. This function
1206  * is currently only called once per OSP.
1207  *
1208  * \param[in] env       execution environment
1209  * \param[out] exp      export connected to OSP
1210  * \param[in] obd       OSP device
1211  * \param[in] cluuid    OSP device client uuid
1212  * \param[in] data      connect_data to be used to connect to the remote
1213  *                      target
1214  * \param[in] localdata necessary for the API interface, but not used in
1215  *                      this function
1216  *
1217  * \retval 0            0 if the connection succeeded.
1218  * \retval negative     negative errno if the connection failed.
1219  */
1220 static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp,
1221                            struct obd_device *obd, struct obd_uuid *cluuid,
1222                            struct obd_connect_data *data, void *localdata)
1223 {
1224         struct osp_device       *osp = lu2osp_dev(obd->obd_lu_dev);
1225         struct obd_connect_data *ocd;
1226         struct obd_import       *imp;
1227         struct lustre_handle     conn;
1228         int                      rc;
1229
1230         ENTRY;
1231
1232         CDEBUG(D_CONFIG, "connect #%d\n", osp->opd_connects);
1233
1234         rc = class_connect(&conn, obd, cluuid);
1235         if (rc)
1236                 RETURN(rc);
1237
1238         *exp = class_conn2export(&conn);
1239         /* Why should there ever be more than 1 connect? */
1240         osp->opd_connects++;
1241         LASSERT(osp->opd_connects == 1);
1242
1243         osp->opd_exp = *exp;
1244
1245         imp = osp->opd_obd->u.cli.cl_import;
1246         imp->imp_dlm_handle = conn;
1247
1248         LASSERT(data != NULL);
1249         LASSERT(data->ocd_connect_flags & OBD_CONNECT_INDEX);
1250         ocd = &imp->imp_connect_data;
1251         *ocd = *data;
1252
1253         imp->imp_connect_flags_orig = ocd->ocd_connect_flags;
1254
1255         ocd->ocd_version = LUSTRE_VERSION_CODE;
1256         ocd->ocd_index = data->ocd_index;
1257         imp->imp_connect_flags_orig = ocd->ocd_connect_flags;
1258
1259         rc = ptlrpc_connect_import(imp);
1260         if (rc) {
1261                 CERROR("%s: can't connect obd: rc = %d\n", obd->obd_name, rc);
1262                 GOTO(out, rc);
1263         }
1264
1265         ptlrpc_pinger_add_import(imp);
1266 out:
1267         RETURN(rc);
1268 }
1269
1270 /**
1271  * Implementation of obd_ops::o_disconnect
1272  *
1273  * Disconnect the export for the OSP.  This is called by LOD to release the
1274  * OSP during cleanup (\see lod_del_device()). The OSP will be released after
1275  * the export is released.
1276  *
1277  * \param[in] exp       export to be disconnected.
1278  *
1279  * \retval 0            0 if disconnection succeed
1280  * \retval negative     negative errno if disconnection failed
1281  */
1282 static int osp_obd_disconnect(struct obd_export *exp)
1283 {
1284         struct obd_device *obd = exp->exp_obd;
1285         struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
1286         int                rc;
1287         ENTRY;
1288
1289         /* Only disconnect the underlying layers on the final disconnect. */
1290         LASSERT(osp->opd_connects == 1);
1291         osp->opd_connects--;
1292
1293         rc = class_disconnect(exp);
1294         if (rc) {
1295                 CERROR("%s: class disconnect error: rc = %d\n",
1296                        obd->obd_name, rc);
1297                 RETURN(rc);
1298         }
1299
1300         /* destroy the device */
1301         class_manual_cleanup(obd);
1302
1303         RETURN(rc);
1304 }
1305
1306 /**
1307  * Implementation of obd_ops::o_statfs
1308  *
1309  * Send a RPC to the remote target to get statfs status. This is only used
1310  * in lprocfs helpers by obd_statfs.
1311  *
1312  * \param[in] env       execution environment
1313  * \param[in] exp       connection state from this OSP to the parent (LOD)
1314  *                      device
1315  * \param[out] osfs     hold the statfs result
1316  * \param[in] unused    Not used in this function for now
1317  * \param[in] flags     flags to indicate how OSP will issue the RPC
1318  *
1319  * \retval 0            0 if statfs succeeded.
1320  * \retval negative     negative errno if statfs failed.
1321  */
1322 static int osp_obd_statfs(const struct lu_env *env, struct obd_export *exp,
1323                           struct obd_statfs *osfs, __u64 unused, __u32 flags)
1324 {
1325         struct obd_statfs       *msfs;
1326         struct ptlrpc_request   *req;
1327         struct obd_import       *imp = NULL;
1328         int                      rc;
1329
1330         ENTRY;
1331
1332         /* Since the request might also come from lprocfs, so we need
1333          * sync this with client_disconnect_export Bug15684 */
1334         down_read(&exp->exp_obd->u.cli.cl_sem);
1335         if (exp->exp_obd->u.cli.cl_import)
1336                 imp = class_import_get(exp->exp_obd->u.cli.cl_import);
1337         up_read(&exp->exp_obd->u.cli.cl_sem);
1338         if (!imp)
1339                 RETURN(-ENODEV);
1340
1341         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
1342
1343         class_import_put(imp);
1344
1345         if (req == NULL)
1346                 RETURN(-ENOMEM);
1347
1348         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
1349         if (rc) {
1350                 ptlrpc_request_free(req);
1351                 RETURN(rc);
1352         }
1353         ptlrpc_request_set_replen(req);
1354         req->rq_request_portal = OST_CREATE_PORTAL;
1355         ptlrpc_at_set_req_timeout(req);
1356
1357         if (flags & OBD_STATFS_NODELAY) {
1358                 /* procfs requests not want stat in wait for avoid deadlock */
1359                 req->rq_no_resend = 1;
1360                 req->rq_no_delay = 1;
1361         }
1362
1363         rc = ptlrpc_queue_wait(req);
1364         if (rc)
1365                 GOTO(out, rc);
1366
1367         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1368         if (msfs == NULL)
1369                 GOTO(out, rc = -EPROTO);
1370
1371         *osfs = *msfs;
1372
1373         EXIT;
1374 out:
1375         ptlrpc_req_finished(req);
1376         return rc;
1377 }
1378
1379 /**
1380  * Prepare fid client.
1381  *
1382  * This function prepares the FID client for the OSP. It will check and assign
1383  * the export (to MDT0) for its FID client, so OSP can allocate super sequence
1384  * or lookup sequence in FLDB of MDT0.
1385  *
1386  * \param[in] osp       OSP device
1387  */
1388 static void osp_prepare_fid_client(struct osp_device *osp)
1389 {
1390         LASSERT(osp->opd_obd->u.cli.cl_seq != NULL);
1391         if (osp->opd_obd->u.cli.cl_seq->lcs_exp != NULL)
1392                 return;
1393
1394         LASSERT(osp->opd_exp != NULL);
1395         osp->opd_obd->u.cli.cl_seq->lcs_exp =
1396                                 class_export_get(osp->opd_exp);
1397 }
1398
1399 /**
1400  * Implementation of obd_ops::o_import_event
1401  *
1402  * This function is called when some related import event happens. It will
1403  * mark the necessary flags according to the event and notify the necessary
1404  * threads (mainly precreate thread).
1405  *
1406  * \param[in] obd       OSP OBD device
1407  * \param[in] imp       import attached from OSP to remote (OST/MDT) service
1408  * \param[in] event     event related to remote service (IMP_EVENT_*)
1409  *
1410  * \retval 0            0 if the event handling succeeded.
1411  * \retval negative     negative errno if the event handling failed.
1412  */
1413 static int osp_import_event(struct obd_device *obd, struct obd_import *imp,
1414                             enum obd_import_event event)
1415 {
1416         struct osp_device *d = lu2osp_dev(obd->obd_lu_dev);
1417
1418         switch (event) {
1419         case IMP_EVENT_DISCON:
1420                 d->opd_got_disconnected = 1;
1421                 d->opd_imp_connected = 0;
1422                 if (d->opd_connect_mdt)
1423                         break;
1424
1425                 if (d->opd_pre != NULL) {
1426                         osp_pre_update_status(d, -ENODEV);
1427                         wake_up(&d->opd_pre_waitq);
1428                 }
1429
1430                 CDEBUG(D_HA, "got disconnected\n");
1431                 break;
1432         case IMP_EVENT_INACTIVE:
1433                 d->opd_imp_active = 0;
1434                 if (d->opd_connect_mdt)
1435                         break;
1436
1437                 if (d->opd_pre != NULL) {
1438                         osp_pre_update_status(d, -ENODEV);
1439                         wake_up(&d->opd_pre_waitq);
1440                 }
1441
1442                 CDEBUG(D_HA, "got inactive\n");
1443                 break;
1444         case IMP_EVENT_ACTIVE:
1445                 d->opd_imp_active = 1;
1446
1447                 osp_prepare_fid_client(d);
1448                 if (d->opd_got_disconnected)
1449                         d->opd_new_connection = 1;
1450                 d->opd_imp_connected = 1;
1451                 d->opd_imp_seen_connected = 1;
1452                 if (d->opd_connect_mdt)
1453                         break;
1454
1455                 if (d->opd_pre != NULL)
1456                         wake_up(&d->opd_pre_waitq);
1457
1458                 __osp_sync_check_for_work(d);
1459                 CDEBUG(D_HA, "got connected\n");
1460                 break;
1461         case IMP_EVENT_INVALIDATE:
1462                 if (obd->obd_namespace == NULL)
1463                         break;
1464                 ldlm_namespace_cleanup(obd->obd_namespace, LDLM_FL_LOCAL_ONLY);
1465                 break;
1466         case IMP_EVENT_OCD:
1467         case IMP_EVENT_DEACTIVATE:
1468         case IMP_EVENT_ACTIVATE:
1469                 break;
1470         default:
1471                 CERROR("%s: unsupported import event: %#x\n",
1472                        obd->obd_name, event);
1473         }
1474         return 0;
1475 }
1476
1477 /**
1478  * Implementation of obd_ops: o_iocontrol
1479  *
1480  * This function is the ioctl handler for OSP. Note: lctl will access the OSP
1481  * directly by ioctl, instead of through the MDS stack.
1482  *
1483  * param[in] cmd        ioctl command.
1484  * param[in] exp        export of this OSP.
1485  * param[in] len        data length of \a karg.
1486  * param[in] karg       input argument which is packed as
1487  *                      obd_ioctl_data
1488  * param[out] uarg      pointer to userspace buffer (must access by
1489  *                      copy_to_user()).
1490  *
1491  * \retval 0            0 if the ioctl handling succeeded.
1492  * \retval negative     negative errno if the ioctl handling failed.
1493  */
1494 static int osp_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1495                          void *karg, void *uarg)
1496 {
1497         struct obd_device       *obd = exp->exp_obd;
1498         struct osp_device       *d;
1499         struct obd_ioctl_data   *data = karg;
1500         int                      rc = 0;
1501
1502         ENTRY;
1503
1504         LASSERT(obd->obd_lu_dev);
1505         d = lu2osp_dev(obd->obd_lu_dev);
1506         LASSERT(d->opd_dt_dev.dd_ops == &osp_dt_ops);
1507
1508         if (!try_module_get(THIS_MODULE)) {
1509                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
1510                        module_name(THIS_MODULE));
1511                 return -EINVAL;
1512         }
1513
1514         switch (cmd) {
1515         case OBD_IOC_CLIENT_RECOVER:
1516                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
1517                                            data->ioc_inlbuf1, 0);
1518                 if (rc > 0)
1519                         rc = 0;
1520                 break;
1521         case IOC_OSC_SET_ACTIVE:
1522                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
1523                                               data->ioc_offset);
1524                 break;
1525         case OBD_IOC_PING_TARGET:
1526                 rc = ptlrpc_obd_ping(obd);
1527                 break;
1528         default:
1529                 CERROR("%s: unrecognized ioctl %#x by %s\n", obd->obd_name,
1530                        cmd, current_comm());
1531                 rc = -ENOTTY;
1532         }
1533         module_put(THIS_MODULE);
1534         return rc;
1535 }
1536
1537 /**
1538  * Implementation of obd_ops::o_get_info
1539  *
1540  * Retrieve information by key. Retrieval starts from the top layer
1541  * (MDT) of the MDS stack and traverses the stack by calling the
1542  * obd_get_info() method of the next sub-layer.
1543  *
1544  * \param[in] env       execution environment
1545  * \param[in] exp       export of this OSP
1546  * \param[in] keylen    length of \a key
1547  * \param[in] key       the key
1548  * \param[out] vallen   length of \a val
1549  * \param[out] val      holds the value returned by the key
1550  * \param[in] unused    necessary for the interface but unused
1551  *
1552  * \retval 0            0 if getting information succeeded.
1553  * \retval negative     negative errno if getting information failed.
1554  */
1555 static int osp_obd_get_info(const struct lu_env *env, struct obd_export *exp,
1556                             __u32 keylen, void *key, __u32 *vallen, void *val,
1557                             struct lov_stripe_md *unused)
1558 {
1559         int rc = -EINVAL;
1560
1561         if (KEY_IS(KEY_OSP_CONNECTED)) {
1562                 struct obd_device       *obd = exp->exp_obd;
1563                 struct osp_device       *osp;
1564
1565                 if (!obd->obd_set_up || obd->obd_stopping)
1566                         RETURN(-EAGAIN);
1567
1568                 osp = lu2osp_dev(obd->obd_lu_dev);
1569                 LASSERT(osp);
1570                 /*
1571                  * 1.8/2.0 behaviour is that OST being connected once at least
1572                  * is considered "healthy". and one "healthy" OST is enough to
1573                  * allow lustre clients to connect to MDS
1574                  */
1575                 RETURN(!osp->opd_imp_seen_connected);
1576         }
1577
1578         RETURN(rc);
1579 }
1580
1581 /**
1582  * Implementation of obd_ops: o_fid_alloc
1583  *
1584  * Allocate a FID. There are two cases in which OSP performs
1585  * FID allocation.
1586  *
1587  * 1. FID precreation for data objects, which is done in
1588  *    osp_precreate_fids() instead of this function.
1589  * 2. FID allocation for each sub-stripe of a striped directory.
1590  *    Similar to other FID clients, OSP requests the sequence
1591  *    from its corresponding remote MDT, which in turn requests
1592  *    sequences from the sequence controller (MDT0).
1593  *
1594  * \param[in] env       execution environment
1595  * \param[in] exp       export of the OSP
1596  * \param[out] fid      FID being allocated
1597  * \param[in] unused    necessary for the interface but unused.
1598  *
1599  * \retval 0            0 FID allocated successfully.
1600  * \retval 1            1 FID allocated successfully and new sequence
1601  *                      requested from seq meta server
1602  * \retval negative     negative errno if FID allocation failed.
1603  */
1604 int osp_fid_alloc(const struct lu_env *env, struct obd_export *exp,
1605                   struct lu_fid *fid, struct md_op_data *unused)
1606 {
1607         struct client_obd       *cli = &exp->exp_obd->u.cli;
1608         struct osp_device       *osp = lu2osp_dev(exp->exp_obd->obd_lu_dev);
1609         struct lu_client_seq    *seq = cli->cl_seq;
1610         ENTRY;
1611
1612         LASSERT(osp->opd_obd->u.cli.cl_seq != NULL);
1613         /* Sigh, fid client is not ready yet */
1614         if (osp->opd_obd->u.cli.cl_seq->lcs_exp == NULL)
1615                 RETURN(-ENOTCONN);
1616
1617         RETURN(seq_client_alloc_fid(env, seq, fid));
1618 }
1619
1620 /* context key constructor/destructor: mdt_key_init, mdt_key_fini */
1621 LU_KEY_INIT_FINI(osp, struct osp_thread_info);
1622 static void osp_key_exit(const struct lu_context *ctx,
1623                          struct lu_context_key *key, void *data)
1624 {
1625         struct osp_thread_info *info = data;
1626
1627         info->osi_attr.la_valid = 0;
1628 }
1629
1630 struct lu_context_key osp_thread_key = {
1631         .lct_tags = LCT_MD_THREAD,
1632         .lct_init = osp_key_init,
1633         .lct_fini = osp_key_fini,
1634         .lct_exit = osp_key_exit
1635 };
1636
1637 /* context key constructor/destructor: mdt_txn_key_init, mdt_txn_key_fini */
1638 LU_KEY_INIT_FINI(osp_txn, struct osp_txn_info);
1639
1640 struct lu_context_key osp_txn_key = {
1641         .lct_tags = LCT_OSP_THREAD | LCT_TX_HANDLE,
1642         .lct_init = osp_txn_key_init,
1643         .lct_fini = osp_txn_key_fini
1644 };
1645 LU_TYPE_INIT_FINI(osp, &osp_thread_key, &osp_txn_key);
1646
1647 static struct lu_device_type_operations osp_device_type_ops = {
1648         .ldto_init           = osp_type_init,
1649         .ldto_fini           = osp_type_fini,
1650
1651         .ldto_start          = osp_type_start,
1652         .ldto_stop           = osp_type_stop,
1653
1654         .ldto_device_alloc   = osp_device_alloc,
1655         .ldto_device_free    = osp_device_free,
1656
1657         .ldto_device_fini    = osp_device_fini
1658 };
1659
1660 static struct lu_device_type osp_device_type = {
1661         .ldt_tags     = LU_DEVICE_DT,
1662         .ldt_name     = LUSTRE_OSP_NAME,
1663         .ldt_ops      = &osp_device_type_ops,
1664         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD,
1665 };
1666
1667 static struct obd_ops osp_obd_device_ops = {
1668         .o_owner        = THIS_MODULE,
1669         .o_add_conn     = client_import_add_conn,
1670         .o_del_conn     = client_import_del_conn,
1671         .o_reconnect    = osp_reconnect,
1672         .o_connect      = osp_obd_connect,
1673         .o_disconnect   = osp_obd_disconnect,
1674         .o_get_info     = osp_obd_get_info,
1675         .o_import_event = osp_import_event,
1676         .o_iocontrol    = osp_iocontrol,
1677         .o_statfs       = osp_obd_statfs,
1678         .o_fid_init     = client_fid_init,
1679         .o_fid_fini     = client_fid_fini,
1680         .o_fid_alloc    = osp_fid_alloc,
1681 };
1682
1683 struct llog_operations osp_mds_ost_orig_logops;
1684
1685 /**
1686  * Initialize OSP module.
1687  *
1688  * Register device types OSP and Light Weight Proxy (LWP) (\see lwp_dev.c)
1689  * in obd_types (\see class_obd.c).  Initialize procfs for the
1690  * the OSP device.  Note: OSP was called OSC before Lustre 2.4,
1691  * so for compatibility it still uses the name "osc" in procfs.
1692  * This is called at module load time.
1693  *
1694  * \retval 0            0 if initialization succeeds.
1695  * \retval negative     negative errno if initialization failed.
1696  */
1697 static int __init osp_mod_init(void)
1698 {
1699         struct obd_type *type;
1700         int rc;
1701
1702         rc = lu_kmem_init(osp_caches);
1703         if (rc)
1704                 return rc;
1705
1706
1707         rc = class_register_type(&osp_obd_device_ops, NULL, true, NULL,
1708 #ifndef HAVE_ONLY_PROCFS_SEQ
1709                                  NULL,
1710 #endif
1711                                  LUSTRE_OSP_NAME, &osp_device_type);
1712         if (rc != 0) {
1713                 lu_kmem_fini(osp_caches);
1714                 return rc;
1715         }
1716
1717         rc = class_register_type(&lwp_obd_device_ops, NULL, true, NULL,
1718 #ifndef HAVE_ONLY_PROCFS_SEQ
1719                                  NULL,
1720 #endif
1721                                  LUSTRE_LWP_NAME, &lwp_device_type);
1722         if (rc != 0) {
1723                 class_unregister_type(LUSTRE_OSP_NAME);
1724                 lu_kmem_fini(osp_caches);
1725                 return rc;
1726         }
1727
1728         /* Note: add_rec/delcare_add_rec will be only used by catalogs */
1729         osp_mds_ost_orig_logops = llog_osd_ops;
1730         osp_mds_ost_orig_logops.lop_add = llog_cat_add_rec;
1731         osp_mds_ost_orig_logops.lop_declare_add = llog_cat_declare_add_rec;
1732
1733         /* create "osc" entry in procfs for compatibility purposes */
1734         type = class_search_type(LUSTRE_OSC_NAME);
1735         if (type != NULL && type->typ_procroot != NULL)
1736                 return rc;
1737
1738         type = class_search_type(LUSTRE_OSP_NAME);
1739         type->typ_procsym = lprocfs_seq_register("osc", proc_lustre_root,
1740                                                  NULL, NULL);
1741         if (IS_ERR(type->typ_procsym)) {
1742                 CERROR("osp: can't create compat entry \"osc\": %d\n",
1743                        (int) PTR_ERR(type->typ_procsym));
1744                 type->typ_procsym = NULL;
1745         }
1746         return rc;
1747 }
1748
1749 /**
1750  * Finalize OSP module.
1751  *
1752  * This callback is called when kernel unloads OSP module from memory, and
1753  * it will deregister OSP and LWP device type from obd_types (\see class_obd.c).
1754  */
1755 static void __exit osp_mod_exit(void)
1756 {
1757         class_unregister_type(LUSTRE_LWP_NAME);
1758         class_unregister_type(LUSTRE_OSP_NAME);
1759         lu_kmem_fini(osp_caches);
1760 }
1761
1762 MODULE_AUTHOR("Intel, Inc. <http://www.intel.com/>");
1763 MODULE_DESCRIPTION("Lustre OST Proxy Device ("LUSTRE_OSP_NAME")");
1764 MODULE_LICENSE("GPL");
1765
1766 cfs_module(osp, LUSTRE_VERSION_STRING, osp_mod_init, osp_mod_exit);