Whamcloud - gitweb
24832ea55f3384b3ffc7f27b65b6059a73794c73
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
153         }
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_setattr_args *sa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                      rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                    rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&body->oa, oa);
459
460         ptlrpc_request_set_replen(req);
461
462         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463             oa->o_flags == OBD_FL_DELORPHAN) {
464                 DEBUG_REQ(D_HA, req,
465                           "delorphan from OST integration");
466                 /* Don't resend the delorphan req */
467                 req->rq_no_resend = req->rq_no_delay = 1;
468         }
469
470         rc = ptlrpc_queue_wait(req);
471         if (rc)
472                 GOTO(out_req, rc);
473
474         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475         if (body == NULL)
476                 GOTO(out_req, rc = -EPROTO);
477
478         lustre_get_wire_obdo(oa, &body->oa);
479
480         /* This should really be sent by the OST */
481         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482         oa->o_valid |= OBD_MD_FLBLKSZ;
483
484         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485          * have valid lsm_oinfo data structs, so don't go touching that.
486          * This needs to be fixed in a big way.
487          */
488         lsm->lsm_object_id = oa->o_id;
489         lsm->lsm_object_seq = oa->o_seq;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body         *body;
519         int                      rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538         osc_pack_capa(req, body, oinfo->oi_capa);
539
540         ptlrpc_request_set_replen(req);
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557                      struct obd_info *oinfo, struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_fsync_args *fa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *fa->fa_oi->oi_oa = body->oa;
585 out:
586         rc = fa->fa_upcall(fa->fa_cookie, rc);
587         RETURN(rc);
588 }
589
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591                   obd_enqueue_update_f upcall, void *cookie,
592                   struct ptlrpc_request_set *rqset)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_fsync_args *fa;
597         int                    rc;
598         ENTRY;
599
600         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601         if (req == NULL)
602                 RETURN(-ENOMEM);
603
604         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         /* overload the size and blocks fields in the oa with start/end */
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615         osc_pack_capa(req, body, oinfo->oi_capa);
616
617         ptlrpc_request_set_replen(req);
618         req->rq_interpret_reply = osc_sync_interpret;
619
620         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621         fa = ptlrpc_req_async_args(req);
622         fa->fa_oi = oinfo;
623         fa->fa_upcall = upcall;
624         fa->fa_cookie = cookie;
625
626         if (rqset == PTLRPCD_SET)
627                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
628         else
629                 ptlrpc_set_add_req(rqset, req);
630
631         RETURN (0);
632 }
633
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635                     struct obd_info *oinfo, obd_size start, obd_size end,
636                     struct ptlrpc_request_set *set)
637 {
638         ENTRY;
639
640         if (!oinfo->oi_oa) {
641                 CDEBUG(D_INFO, "oa NULL\n");
642                 RETURN(-EINVAL);
643         }
644
645         oinfo->oi_oa->o_size = start;
646         oinfo->oi_oa->o_blocks = end;
647         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
648
649         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
650 }
651
652 /* Find and cancel locally locks matched by @mode in the resource found by
653  * @objid. Found locks are added into @cancel list. Returns the amount of
654  * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
656                                    cfs_list_t *cancels,
657                                    ldlm_mode_t mode, int lock_flags)
658 {
659         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660         struct ldlm_res_id res_id;
661         struct ldlm_resource *res;
662         int count;
663         ENTRY;
664
665         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
666         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
667         if (res == NULL)
668                 RETURN(0);
669
670         LDLM_RESOURCE_ADDREF(res);
671         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
672                                            lock_flags, 0, NULL);
673         LDLM_RESOURCE_DELREF(res);
674         ldlm_resource_putref(res);
675         RETURN(count);
676 }
677
678 static int osc_destroy_interpret(const struct lu_env *env,
679                                  struct ptlrpc_request *req, void *data,
680                                  int rc)
681 {
682         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
683
684         cfs_atomic_dec(&cli->cl_destroy_in_flight);
685         cfs_waitq_signal(&cli->cl_destroy_waitq);
686         return 0;
687 }
688
689 static int osc_can_send_destroy(struct client_obd *cli)
690 {
691         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
692             cli->cl_max_rpcs_in_flight) {
693                 /* The destroy request can be sent */
694                 return 1;
695         }
696         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
697             cli->cl_max_rpcs_in_flight) {
698                 /*
699                  * The counter has been modified between the two atomic
700                  * operations.
701                  */
702                 cfs_waitq_signal(&cli->cl_destroy_waitq);
703         }
704         return 0;
705 }
706
707 /* Destroy requests can be async always on the client, and we don't even really
708  * care about the return code since the client cannot do anything at all about
709  * a destroy failure.
710  * When the MDS is unlinking a filename, it saves the file objects into a
711  * recovery llog, and these object records are cancelled when the OST reports
712  * they were destroyed and sync'd to disk (i.e. transaction committed).
713  * If the client dies, or the OST is down when the object should be destroyed,
714  * the records are not cancelled, and when the OST reconnects to the MDS next,
715  * it will retrieve the llog unlink logs and then sends the log cancellation
716  * cookies to the MDS after committing destroy transactions. */
717 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
718                        struct obdo *oa, struct lov_stripe_md *ea,
719                        struct obd_trans_info *oti, struct obd_export *md_export,
720                        void *capa)
721 {
722         struct client_obd     *cli = &exp->exp_obd->u.cli;
723         struct ptlrpc_request *req;
724         struct ost_body       *body;
725         CFS_LIST_HEAD(cancels);
726         int rc, count;
727         ENTRY;
728
729         if (!oa) {
730                 CDEBUG(D_INFO, "oa NULL\n");
731                 RETURN(-EINVAL);
732         }
733
734         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
735                                         LDLM_FL_DISCARD_DATA);
736
737         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
738         if (req == NULL) {
739                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
740                 RETURN(-ENOMEM);
741         }
742
743         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
744         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
745                                0, &cancels, count);
746         if (rc) {
747                 ptlrpc_request_free(req);
748                 RETURN(rc);
749         }
750
751         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
752         ptlrpc_at_set_req_timeout(req);
753
754         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
755                 oa->o_lcookie = *oti->oti_logcookies;
756         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
757         LASSERT(body);
758         lustre_set_wire_obdo(&body->oa, oa);
759
760         osc_pack_capa(req, body, (struct obd_capa *)capa);
761         ptlrpc_request_set_replen(req);
762
763         /* don't throttle destroy RPCs for the MDT */
764         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
765                 req->rq_interpret_reply = osc_destroy_interpret;
766                 if (!osc_can_send_destroy(cli)) {
767                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
768                                                           NULL);
769
770                         /*
771                          * Wait until the number of on-going destroy RPCs drops
772                          * under max_rpc_in_flight
773                          */
774                         l_wait_event_exclusive(cli->cl_destroy_waitq,
775                                                osc_can_send_destroy(cli), &lwi);
776                 }
777         }
778
779         /* Do not wait for response */
780         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
781         RETURN(0);
782 }
783
784 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
785                                 long writing_bytes)
786 {
787         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
788
789         LASSERT(!(oa->o_valid & bits));
790
791         oa->o_valid |= bits;
792         client_obd_list_lock(&cli->cl_loi_list_lock);
793         oa->o_dirty = cli->cl_dirty;
794         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
795                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
796                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
797                 oa->o_undirty = 0;
798         } else if (cfs_atomic_read(&obd_dirty_pages) -
799                    cfs_atomic_read(&obd_dirty_transit_pages) >
800                    obd_max_dirty_pages + 1){
801                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
802                  * not covered by a lock thus they may safely race and trip
803                  * this CERROR() unless we add in a small fudge factor (+1). */
804                 CERROR("dirty %d - %d > system dirty_max %d\n",
805                        cfs_atomic_read(&obd_dirty_pages),
806                        cfs_atomic_read(&obd_dirty_transit_pages),
807                        obd_max_dirty_pages);
808                 oa->o_undirty = 0;
809         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
810                 CERROR("dirty %lu - dirty_max %lu too big???\n",
811                        cli->cl_dirty, cli->cl_dirty_max);
812                 oa->o_undirty = 0;
813         } else {
814                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
815                                 (cli->cl_max_rpcs_in_flight + 1);
816                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
817         }
818         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
819         oa->o_dropped = cli->cl_lost_grant;
820         cli->cl_lost_grant = 0;
821         client_obd_list_unlock(&cli->cl_loi_list_lock);
822         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
823                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
824
825 }
826
827 void osc_update_next_shrink(struct client_obd *cli)
828 {
829         cli->cl_next_shrink_grant =
830                 cfs_time_shift(cli->cl_grant_shrink_interval);
831         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
832                cli->cl_next_shrink_grant);
833 }
834
835 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
836 {
837         client_obd_list_lock(&cli->cl_loi_list_lock);
838         cli->cl_avail_grant += grant;
839         client_obd_list_unlock(&cli->cl_loi_list_lock);
840 }
841
842 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
843 {
844         if (body->oa.o_valid & OBD_MD_FLGRANT) {
845                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
846                 __osc_update_grant(cli, body->oa.o_grant);
847         }
848 }
849
850 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
851                               obd_count keylen, void *key, obd_count vallen,
852                               void *val, struct ptlrpc_request_set *set);
853
854 static int osc_shrink_grant_interpret(const struct lu_env *env,
855                                       struct ptlrpc_request *req,
856                                       void *aa, int rc)
857 {
858         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
859         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
860         struct ost_body *body;
861
862         if (rc != 0) {
863                 __osc_update_grant(cli, oa->o_grant);
864                 GOTO(out, rc);
865         }
866
867         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
868         LASSERT(body);
869         osc_update_grant(cli, body);
870 out:
871         OBDO_FREE(oa);
872         return rc;
873 }
874
875 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
876 {
877         client_obd_list_lock(&cli->cl_loi_list_lock);
878         oa->o_grant = cli->cl_avail_grant / 4;
879         cli->cl_avail_grant -= oa->o_grant;
880         client_obd_list_unlock(&cli->cl_loi_list_lock);
881         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
882                 oa->o_valid |= OBD_MD_FLFLAGS;
883                 oa->o_flags = 0;
884         }
885         oa->o_flags |= OBD_FL_SHRINK_GRANT;
886         osc_update_next_shrink(cli);
887 }
888
889 /* Shrink the current grant, either from some large amount to enough for a
890  * full set of in-flight RPCs, or if we have already shrunk to that limit
891  * then to enough for a single RPC.  This avoids keeping more grant than
892  * needed, and avoids shrinking the grant piecemeal. */
893 static int osc_shrink_grant(struct client_obd *cli)
894 {
895         long target = (cli->cl_max_rpcs_in_flight + 1) *
896                       cli->cl_max_pages_per_rpc;
897
898         client_obd_list_lock(&cli->cl_loi_list_lock);
899         if (cli->cl_avail_grant <= target)
900                 target = cli->cl_max_pages_per_rpc;
901         client_obd_list_unlock(&cli->cl_loi_list_lock);
902
903         return osc_shrink_grant_to_target(cli, target);
904 }
905
906 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
907 {
908         int    rc = 0;
909         struct ost_body     *body;
910         ENTRY;
911
912         client_obd_list_lock(&cli->cl_loi_list_lock);
913         /* Don't shrink if we are already above or below the desired limit
914          * We don't want to shrink below a single RPC, as that will negatively
915          * impact block allocation and long-term performance. */
916         if (target < cli->cl_max_pages_per_rpc)
917                 target = cli->cl_max_pages_per_rpc;
918
919         if (target >= cli->cl_avail_grant) {
920                 client_obd_list_unlock(&cli->cl_loi_list_lock);
921                 RETURN(0);
922         }
923         client_obd_list_unlock(&cli->cl_loi_list_lock);
924
925         OBD_ALLOC_PTR(body);
926         if (!body)
927                 RETURN(-ENOMEM);
928
929         osc_announce_cached(cli, &body->oa, 0);
930
931         client_obd_list_lock(&cli->cl_loi_list_lock);
932         body->oa.o_grant = cli->cl_avail_grant - target;
933         cli->cl_avail_grant = target;
934         client_obd_list_unlock(&cli->cl_loi_list_lock);
935         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
936                 body->oa.o_valid |= OBD_MD_FLFLAGS;
937                 body->oa.o_flags = 0;
938         }
939         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
940         osc_update_next_shrink(cli);
941
942         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
943                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
944                                 sizeof(*body), body, NULL);
945         if (rc != 0)
946                 __osc_update_grant(cli, body->oa.o_grant);
947         OBD_FREE_PTR(body);
948         RETURN(rc);
949 }
950
951 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
952 static int osc_should_shrink_grant(struct client_obd *client)
953 {
954         cfs_time_t time = cfs_time_current();
955         cfs_time_t next_shrink = client->cl_next_shrink_grant;
956
957         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
958              OBD_CONNECT_GRANT_SHRINK) == 0)
959                 return 0;
960
961         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
962                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
963                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
964                         return 1;
965                 else
966                         osc_update_next_shrink(client);
967         }
968         return 0;
969 }
970
971 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
972 {
973         struct client_obd *client;
974
975         cfs_list_for_each_entry(client, &item->ti_obd_list,
976                                 cl_grant_shrink_list) {
977                 if (osc_should_shrink_grant(client))
978                         osc_shrink_grant(client);
979         }
980         return 0;
981 }
982
983 static int osc_add_shrink_grant(struct client_obd *client)
984 {
985         int rc;
986
987         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
988                                        TIMEOUT_GRANT,
989                                        osc_grant_shrink_grant_cb, NULL,
990                                        &client->cl_grant_shrink_list);
991         if (rc) {
992                 CERROR("add grant client %s error %d\n",
993                         client->cl_import->imp_obd->obd_name, rc);
994                 return rc;
995         }
996         CDEBUG(D_CACHE, "add grant client %s \n",
997                client->cl_import->imp_obd->obd_name);
998         osc_update_next_shrink(client);
999         return 0;
1000 }
1001
1002 static int osc_del_shrink_grant(struct client_obd *client)
1003 {
1004         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1005                                          TIMEOUT_GRANT);
1006 }
1007
1008 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1009 {
1010         /*
1011          * ocd_grant is the total grant amount we're expect to hold: if we've
1012          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1013          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1014          *
1015          * race is tolerable here: if we're evicted, but imp_state already
1016          * left EVICTED state, then cl_dirty must be 0 already.
1017          */
1018         client_obd_list_lock(&cli->cl_loi_list_lock);
1019         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1020                 cli->cl_avail_grant = ocd->ocd_grant;
1021         else
1022                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1023
1024         if (cli->cl_avail_grant < 0) {
1025                 CWARN("%s: available grant < 0, the OSS is probably not running"
1026                       " with patch from bug20278 (%ld) \n",
1027                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1028                 /* workaround for 1.6 servers which do not have
1029                  * the patch from bug20278 */
1030                 cli->cl_avail_grant = ocd->ocd_grant;
1031         }
1032
1033         /* determine the appropriate chunk size used by osc_extent. */
1034         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1035         client_obd_list_unlock(&cli->cl_loi_list_lock);
1036
1037         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1038                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1039                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1040
1041         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1042             cfs_list_empty(&cli->cl_grant_shrink_list))
1043                 osc_add_shrink_grant(cli);
1044 }
1045
1046 /* We assume that the reason this OSC got a short read is because it read
1047  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1048  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1049  * this stripe never got written at or beyond this stripe offset yet. */
1050 static void handle_short_read(int nob_read, obd_count page_count,
1051                               struct brw_page **pga)
1052 {
1053         char *ptr;
1054         int i = 0;
1055
1056         /* skip bytes read OK */
1057         while (nob_read > 0) {
1058                 LASSERT (page_count > 0);
1059
1060                 if (pga[i]->count > nob_read) {
1061                         /* EOF inside this page */
1062                         ptr = cfs_kmap(pga[i]->pg) +
1063                                 (pga[i]->off & ~CFS_PAGE_MASK);
1064                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1065                         cfs_kunmap(pga[i]->pg);
1066                         page_count--;
1067                         i++;
1068                         break;
1069                 }
1070
1071                 nob_read -= pga[i]->count;
1072                 page_count--;
1073                 i++;
1074         }
1075
1076         /* zero remaining pages */
1077         while (page_count-- > 0) {
1078                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1079                 memset(ptr, 0, pga[i]->count);
1080                 cfs_kunmap(pga[i]->pg);
1081                 i++;
1082         }
1083 }
1084
1085 static int check_write_rcs(struct ptlrpc_request *req,
1086                            int requested_nob, int niocount,
1087                            obd_count page_count, struct brw_page **pga)
1088 {
1089         int     i;
1090         __u32   *remote_rcs;
1091
1092         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1093                                                   sizeof(*remote_rcs) *
1094                                                   niocount);
1095         if (remote_rcs == NULL) {
1096                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1097                 return(-EPROTO);
1098         }
1099
1100         /* return error if any niobuf was in error */
1101         for (i = 0; i < niocount; i++) {
1102                 if ((int)remote_rcs[i] < 0)
1103                         return(remote_rcs[i]);
1104
1105                 if (remote_rcs[i] != 0) {
1106                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1107                                 i, remote_rcs[i], req);
1108                         return(-EPROTO);
1109                 }
1110         }
1111
1112         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1113                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1114                        req->rq_bulk->bd_nob_transferred, requested_nob);
1115                 return(-EPROTO);
1116         }
1117
1118         return (0);
1119 }
1120
1121 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1122 {
1123         if (p1->flag != p2->flag) {
1124                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1125                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1126
1127                 /* warn if we try to combine flags that we don't know to be
1128                  * safe to combine */
1129                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1130                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1131                               "report this at http://bugs.whamcloud.com/\n",
1132                               p1->flag, p2->flag);
1133                 }
1134                 return 0;
1135         }
1136
1137         return (p1->off + p1->count == p2->off);
1138 }
1139
1140 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1141                                    struct brw_page **pga, int opc,
1142                                    cksum_type_t cksum_type)
1143 {
1144         __u32 cksum;
1145         int i = 0;
1146
1147         LASSERT (pg_count > 0);
1148         cksum = init_checksum(cksum_type);
1149         while (nob > 0 && pg_count > 0) {
1150                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1151                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1152                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1153
1154                 /* corrupt the data before we compute the checksum, to
1155                  * simulate an OST->client data error */
1156                 if (i == 0 && opc == OST_READ &&
1157                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1158                         memcpy(ptr + off, "bad1", min(4, nob));
1159                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1160                 cfs_kunmap(pga[i]->pg);
1161                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1162                                off, cksum);
1163
1164                 nob -= pga[i]->count;
1165                 pg_count--;
1166                 i++;
1167         }
1168         /* For sending we only compute the wrong checksum instead
1169          * of corrupting the data so it is still correct on a redo */
1170         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1171                 cksum++;
1172
1173         return fini_checksum(cksum, cksum_type);
1174 }
1175
1176 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1177                                 struct lov_stripe_md *lsm, obd_count page_count,
1178                                 struct brw_page **pga,
1179                                 struct ptlrpc_request **reqp,
1180                                 struct obd_capa *ocapa, int reserve,
1181                                 int resend)
1182 {
1183         struct ptlrpc_request   *req;
1184         struct ptlrpc_bulk_desc *desc;
1185         struct ost_body         *body;
1186         struct obd_ioobj        *ioobj;
1187         struct niobuf_remote    *niobuf;
1188         int niocount, i, requested_nob, opc, rc;
1189         struct osc_brw_async_args *aa;
1190         struct req_capsule      *pill;
1191         struct brw_page *pg_prev;
1192
1193         ENTRY;
1194         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1195                 RETURN(-ENOMEM); /* Recoverable */
1196         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1197                 RETURN(-EINVAL); /* Fatal */
1198
1199         if ((cmd & OBD_BRW_WRITE) != 0) {
1200                 opc = OST_WRITE;
1201                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1202                                                 cli->cl_import->imp_rq_pool,
1203                                                 &RQF_OST_BRW_WRITE);
1204         } else {
1205                 opc = OST_READ;
1206                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1207         }
1208         if (req == NULL)
1209                 RETURN(-ENOMEM);
1210
1211         for (niocount = i = 1; i < page_count; i++) {
1212                 if (!can_merge_pages(pga[i - 1], pga[i]))
1213                         niocount++;
1214         }
1215
1216         pill = &req->rq_pill;
1217         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1218                              sizeof(*ioobj));
1219         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1220                              niocount * sizeof(*niobuf));
1221         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1222
1223         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1224         if (rc) {
1225                 ptlrpc_request_free(req);
1226                 RETURN(rc);
1227         }
1228         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1229         ptlrpc_at_set_req_timeout(req);
1230
1231         if (opc == OST_WRITE)
1232                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1233                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1234         else
1235                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1236                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1237
1238         if (desc == NULL)
1239                 GOTO(out, rc = -ENOMEM);
1240         /* NB request now owns desc and will free it when it gets freed */
1241
1242         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1243         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1244         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1245         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1246
1247         lustre_set_wire_obdo(&body->oa, oa);
1248
1249         obdo_to_ioobj(oa, ioobj);
1250         ioobj->ioo_bufcnt = niocount;
1251         osc_pack_capa(req, body, ocapa);
1252         LASSERT (page_count > 0);
1253         pg_prev = pga[0];
1254         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1255                 struct brw_page *pg = pga[i];
1256                 int poff = pg->off & ~CFS_PAGE_MASK;
1257
1258                 LASSERT(pg->count > 0);
1259                 /* make sure there is no gap in the middle of page array */
1260                 LASSERTF(page_count == 1 ||
1261                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1262                           ergo(i > 0 && i < page_count - 1,
1263                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1264                           ergo(i == page_count - 1, poff == 0)),
1265                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1266                          i, page_count, pg, pg->off, pg->count);
1267 #ifdef __linux__
1268                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1269                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1270                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1271                          i, page_count,
1272                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1273                          pg_prev->pg, page_private(pg_prev->pg),
1274                          pg_prev->pg->index, pg_prev->off);
1275 #else
1276                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1277                          "i %d p_c %u\n", i, page_count);
1278 #endif
1279                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1280                         (pg->flag & OBD_BRW_SRVLOCK));
1281
1282                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1283                 requested_nob += pg->count;
1284
1285                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1286                         niobuf--;
1287                         niobuf->len += pg->count;
1288                 } else {
1289                         niobuf->offset = pg->off;
1290                         niobuf->len    = pg->count;
1291                         niobuf->flags  = pg->flag;
1292                 }
1293                 pg_prev = pg;
1294         }
1295
1296         LASSERTF((void *)(niobuf - niocount) ==
1297                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1298                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1299                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1300
1301         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1302         if (resend) {
1303                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1304                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1305                         body->oa.o_flags = 0;
1306                 }
1307                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1308         }
1309
1310         if (osc_should_shrink_grant(cli))
1311                 osc_shrink_grant_local(cli, &body->oa);
1312
1313         /* size[REQ_REC_OFF] still sizeof (*body) */
1314         if (opc == OST_WRITE) {
1315                 if (cli->cl_checksum &&
1316                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1317                         /* store cl_cksum_type in a local variable since
1318                          * it can be changed via lprocfs */
1319                         cksum_type_t cksum_type = cli->cl_cksum_type;
1320
1321                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1322                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1323                                 body->oa.o_flags = 0;
1324                         }
1325                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1326                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1327                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1328                                                              page_count, pga,
1329                                                              OST_WRITE,
1330                                                              cksum_type);
1331                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1332                                body->oa.o_cksum);
1333                         /* save this in 'oa', too, for later checking */
1334                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1335                         oa->o_flags |= cksum_type_pack(cksum_type);
1336                 } else {
1337                         /* clear out the checksum flag, in case this is a
1338                          * resend but cl_checksum is no longer set. b=11238 */
1339                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1340                 }
1341                 oa->o_cksum = body->oa.o_cksum;
1342                 /* 1 RC per niobuf */
1343                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1344                                      sizeof(__u32) * niocount);
1345         } else {
1346                 if (cli->cl_checksum &&
1347                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1348                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1349                                 body->oa.o_flags = 0;
1350                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1351                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1352                 }
1353         }
1354         ptlrpc_request_set_replen(req);
1355
1356         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1357         aa = ptlrpc_req_async_args(req);
1358         aa->aa_oa = oa;
1359         aa->aa_requested_nob = requested_nob;
1360         aa->aa_nio_count = niocount;
1361         aa->aa_page_count = page_count;
1362         aa->aa_resends = 0;
1363         aa->aa_ppga = pga;
1364         aa->aa_cli = cli;
1365         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1366         if (ocapa && reserve)
1367                 aa->aa_ocapa = capa_get(ocapa);
1368
1369         *reqp = req;
1370         RETURN(0);
1371
1372  out:
1373         ptlrpc_req_finished(req);
1374         RETURN(rc);
1375 }
1376
1377 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1378                                 __u32 client_cksum, __u32 server_cksum, int nob,
1379                                 obd_count page_count, struct brw_page **pga,
1380                                 cksum_type_t client_cksum_type)
1381 {
1382         __u32 new_cksum;
1383         char *msg;
1384         cksum_type_t cksum_type;
1385
1386         if (server_cksum == client_cksum) {
1387                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1388                 return 0;
1389         }
1390
1391         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1392                                        oa->o_flags : 0);
1393         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1394                                       cksum_type);
1395
1396         if (cksum_type != client_cksum_type)
1397                 msg = "the server did not use the checksum type specified in "
1398                       "the original request - likely a protocol problem";
1399         else if (new_cksum == server_cksum)
1400                 msg = "changed on the client after we checksummed it - "
1401                       "likely false positive due to mmap IO (bug 11742)";
1402         else if (new_cksum == client_cksum)
1403                 msg = "changed in transit before arrival at OST";
1404         else
1405                 msg = "changed in transit AND doesn't match the original - "
1406                       "likely false positive due to mmap IO (bug 11742)";
1407
1408         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1409                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1410                            msg, libcfs_nid2str(peer->nid),
1411                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1412                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1413                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1414                            oa->o_id,
1415                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1416                            pga[0]->off,
1417                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1418         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1419                "client csum now %x\n", client_cksum, client_cksum_type,
1420                server_cksum, cksum_type, new_cksum);
1421         return 1;
1422 }
1423
1424 /* Note rc enters this function as number of bytes transferred */
1425 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1426 {
1427         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1428         const lnet_process_id_t *peer =
1429                         &req->rq_import->imp_connection->c_peer;
1430         struct client_obd *cli = aa->aa_cli;
1431         struct ost_body *body;
1432         __u32 client_cksum = 0;
1433         ENTRY;
1434
1435         if (rc < 0 && rc != -EDQUOT) {
1436                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1437                 RETURN(rc);
1438         }
1439
1440         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1441         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1442         if (body == NULL) {
1443                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1444                 RETURN(-EPROTO);
1445         }
1446
1447         /* set/clear over quota flag for a uid/gid */
1448         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1449             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1450                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1451
1452                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1453                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1454                        body->oa.o_flags);
1455                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1456         }
1457
1458         osc_update_grant(cli, body);
1459
1460         if (rc < 0)
1461                 RETURN(rc);
1462
1463         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1464                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1465
1466         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1467                 if (rc > 0) {
1468                         CERROR("Unexpected +ve rc %d\n", rc);
1469                         RETURN(-EPROTO);
1470                 }
1471                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1472
1473                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1474                         RETURN(-EAGAIN);
1475
1476                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1477                     check_write_checksum(&body->oa, peer, client_cksum,
1478                                          body->oa.o_cksum, aa->aa_requested_nob,
1479                                          aa->aa_page_count, aa->aa_ppga,
1480                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1481                         RETURN(-EAGAIN);
1482
1483                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1484                                      aa->aa_page_count, aa->aa_ppga);
1485                 GOTO(out, rc);
1486         }
1487
1488         /* The rest of this function executes only for OST_READs */
1489
1490         /* if unwrap_bulk failed, return -EAGAIN to retry */
1491         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1492         if (rc < 0)
1493                 GOTO(out, rc = -EAGAIN);
1494
1495         if (rc > aa->aa_requested_nob) {
1496                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1497                        aa->aa_requested_nob);
1498                 RETURN(-EPROTO);
1499         }
1500
1501         if (rc != req->rq_bulk->bd_nob_transferred) {
1502                 CERROR ("Unexpected rc %d (%d transferred)\n",
1503                         rc, req->rq_bulk->bd_nob_transferred);
1504                 return (-EPROTO);
1505         }
1506
1507         if (rc < aa->aa_requested_nob)
1508                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1509
1510         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1511                 static int cksum_counter;
1512                 __u32      server_cksum = body->oa.o_cksum;
1513                 char      *via;
1514                 char      *router;
1515                 cksum_type_t cksum_type;
1516
1517                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1518                                                body->oa.o_flags : 0);
1519                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1520                                                  aa->aa_ppga, OST_READ,
1521                                                  cksum_type);
1522
1523                 if (peer->nid == req->rq_bulk->bd_sender) {
1524                         via = router = "";
1525                 } else {
1526                         via = " via ";
1527                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1528                 }
1529
1530                 if (server_cksum == ~0 && rc > 0) {
1531                         CERROR("Protocol error: server %s set the 'checksum' "
1532                                "bit, but didn't send a checksum.  Not fatal, "
1533                                "but please notify on http://bugs.whamcloud.com/\n",
1534                                libcfs_nid2str(peer->nid));
1535                 } else if (server_cksum != client_cksum) {
1536                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1537                                            "%s%s%s inode "DFID" object "
1538                                            LPU64"/"LPU64" extent "
1539                                            "["LPU64"-"LPU64"]\n",
1540                                            req->rq_import->imp_obd->obd_name,
1541                                            libcfs_nid2str(peer->nid),
1542                                            via, router,
1543                                            body->oa.o_valid & OBD_MD_FLFID ?
1544                                                 body->oa.o_parent_seq : (__u64)0,
1545                                            body->oa.o_valid & OBD_MD_FLFID ?
1546                                                 body->oa.o_parent_oid : 0,
1547                                            body->oa.o_valid & OBD_MD_FLFID ?
1548                                                 body->oa.o_parent_ver : 0,
1549                                            body->oa.o_id,
1550                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1551                                                 body->oa.o_seq : (__u64)0,
1552                                            aa->aa_ppga[0]->off,
1553                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1554                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1555                                                                         1);
1556                         CERROR("client %x, server %x, cksum_type %x\n",
1557                                client_cksum, server_cksum, cksum_type);
1558                         cksum_counter = 0;
1559                         aa->aa_oa->o_cksum = client_cksum;
1560                         rc = -EAGAIN;
1561                 } else {
1562                         cksum_counter++;
1563                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1564                         rc = 0;
1565                 }
1566         } else if (unlikely(client_cksum)) {
1567                 static int cksum_missed;
1568
1569                 cksum_missed++;
1570                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1571                         CERROR("Checksum %u requested from %s but not sent\n",
1572                                cksum_missed, libcfs_nid2str(peer->nid));
1573         } else {
1574                 rc = 0;
1575         }
1576 out:
1577         if (rc >= 0)
1578                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1579
1580         RETURN(rc);
1581 }
1582
1583 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1584                             struct lov_stripe_md *lsm,
1585                             obd_count page_count, struct brw_page **pga,
1586                             struct obd_capa *ocapa)
1587 {
1588         struct ptlrpc_request *req;
1589         int                    rc;
1590         cfs_waitq_t            waitq;
1591         int                    generation, resends = 0;
1592         struct l_wait_info     lwi;
1593
1594         ENTRY;
1595
1596         cfs_waitq_init(&waitq);
1597         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1598
1599 restart_bulk:
1600         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1601                                   page_count, pga, &req, ocapa, 0, resends);
1602         if (rc != 0)
1603                 return (rc);
1604
1605         if (resends) {
1606                 req->rq_generation_set = 1;
1607                 req->rq_import_generation = generation;
1608                 req->rq_sent = cfs_time_current_sec() + resends;
1609         }
1610
1611         rc = ptlrpc_queue_wait(req);
1612
1613         if (rc == -ETIMEDOUT && req->rq_resend) {
1614                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1615                 ptlrpc_req_finished(req);
1616                 goto restart_bulk;
1617         }
1618
1619         rc = osc_brw_fini_request(req, rc);
1620
1621         ptlrpc_req_finished(req);
1622         /* When server return -EINPROGRESS, client should always retry
1623          * regardless of the number of times the bulk was resent already.*/
1624         if (osc_recoverable_error(rc)) {
1625                 resends++;
1626                 if (rc != -EINPROGRESS &&
1627                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1628                         CERROR("%s: too many resend retries for object: "
1629                                ""LPU64":"LPU64", rc = %d.\n",
1630                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1631                         goto out;
1632                 }
1633                 if (generation !=
1634                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1635                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1636                                ""LPU64":"LPU64", rc = %d.\n",
1637                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1638                         goto out;
1639                 }
1640
1641                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1642                                        NULL);
1643                 l_wait_event(waitq, 0, &lwi);
1644
1645                 goto restart_bulk;
1646         }
1647 out:
1648         if (rc == -EAGAIN || rc == -EINPROGRESS)
1649                 rc = -EIO;
1650         RETURN (rc);
1651 }
1652
1653 int osc_brw_redo_request(struct ptlrpc_request *request,
1654                          struct osc_brw_async_args *aa)
1655 {
1656         struct ptlrpc_request *new_req;
1657         struct osc_brw_async_args *new_aa;
1658         struct osc_async_page *oap;
1659         int rc = 0;
1660         ENTRY;
1661
1662         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1663
1664         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1665                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1666                                   aa->aa_cli, aa->aa_oa,
1667                                   NULL /* lsm unused by osc currently */,
1668                                   aa->aa_page_count, aa->aa_ppga,
1669                                   &new_req, aa->aa_ocapa, 0, 1);
1670         if (rc)
1671                 RETURN(rc);
1672
1673         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1674                 if (oap->oap_request != NULL) {
1675                         LASSERTF(request == oap->oap_request,
1676                                  "request %p != oap_request %p\n",
1677                                  request, oap->oap_request);
1678                         if (oap->oap_interrupted) {
1679                                 ptlrpc_req_finished(new_req);
1680                                 RETURN(-EINTR);
1681                         }
1682                 }
1683         }
1684         /* New request takes over pga and oaps from old request.
1685          * Note that copying a list_head doesn't work, need to move it... */
1686         aa->aa_resends++;
1687         new_req->rq_interpret_reply = request->rq_interpret_reply;
1688         new_req->rq_async_args = request->rq_async_args;
1689         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1690         new_req->rq_generation_set = 1;
1691         new_req->rq_import_generation = request->rq_import_generation;
1692
1693         new_aa = ptlrpc_req_async_args(new_req);
1694
1695         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1696         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1697         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1698         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1699
1700         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1701                 if (oap->oap_request) {
1702                         ptlrpc_req_finished(oap->oap_request);
1703                         oap->oap_request = ptlrpc_request_addref(new_req);
1704                 }
1705         }
1706
1707         new_aa->aa_ocapa = aa->aa_ocapa;
1708         aa->aa_ocapa = NULL;
1709
1710         /* XXX: This code will run into problem if we're going to support
1711          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1712          * and wait for all of them to be finished. We should inherit request
1713          * set from old request. */
1714         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1715
1716         DEBUG_REQ(D_INFO, new_req, "new request");
1717         RETURN(0);
1718 }
1719
1720 /*
1721  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1722  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1723  * fine for our small page arrays and doesn't require allocation.  its an
1724  * insertion sort that swaps elements that are strides apart, shrinking the
1725  * stride down until its '1' and the array is sorted.
1726  */
1727 static void sort_brw_pages(struct brw_page **array, int num)
1728 {
1729         int stride, i, j;
1730         struct brw_page *tmp;
1731
1732         if (num == 1)
1733                 return;
1734         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1735                 ;
1736
1737         do {
1738                 stride /= 3;
1739                 for (i = stride ; i < num ; i++) {
1740                         tmp = array[i];
1741                         j = i;
1742                         while (j >= stride && array[j - stride]->off > tmp->off) {
1743                                 array[j] = array[j - stride];
1744                                 j -= stride;
1745                         }
1746                         array[j] = tmp;
1747                 }
1748         } while (stride > 1);
1749 }
1750
1751 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1752 {
1753         int count = 1;
1754         int offset;
1755         int i = 0;
1756
1757         LASSERT (pages > 0);
1758         offset = pg[i]->off & ~CFS_PAGE_MASK;
1759
1760         for (;;) {
1761                 pages--;
1762                 if (pages == 0)         /* that's all */
1763                         return count;
1764
1765                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1766                         return count;   /* doesn't end on page boundary */
1767
1768                 i++;
1769                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1770                 if (offset != 0)        /* doesn't start on page boundary */
1771                         return count;
1772
1773                 count++;
1774         }
1775 }
1776
1777 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1778 {
1779         struct brw_page **ppga;
1780         int i;
1781
1782         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1783         if (ppga == NULL)
1784                 return NULL;
1785
1786         for (i = 0; i < count; i++)
1787                 ppga[i] = pga + i;
1788         return ppga;
1789 }
1790
1791 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1792 {
1793         LASSERT(ppga != NULL);
1794         OBD_FREE(ppga, sizeof(*ppga) * count);
1795 }
1796
1797 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1798                    obd_count page_count, struct brw_page *pga,
1799                    struct obd_trans_info *oti)
1800 {
1801         struct obdo *saved_oa = NULL;
1802         struct brw_page **ppga, **orig;
1803         struct obd_import *imp = class_exp2cliimp(exp);
1804         struct client_obd *cli;
1805         int rc, page_count_orig;
1806         ENTRY;
1807
1808         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1809         cli = &imp->imp_obd->u.cli;
1810
1811         if (cmd & OBD_BRW_CHECK) {
1812                 /* The caller just wants to know if there's a chance that this
1813                  * I/O can succeed */
1814
1815                 if (imp->imp_invalid)
1816                         RETURN(-EIO);
1817                 RETURN(0);
1818         }
1819
1820         /* test_brw with a failed create can trip this, maybe others. */
1821         LASSERT(cli->cl_max_pages_per_rpc);
1822
1823         rc = 0;
1824
1825         orig = ppga = osc_build_ppga(pga, page_count);
1826         if (ppga == NULL)
1827                 RETURN(-ENOMEM);
1828         page_count_orig = page_count;
1829
1830         sort_brw_pages(ppga, page_count);
1831         while (page_count) {
1832                 obd_count pages_per_brw;
1833
1834                 if (page_count > cli->cl_max_pages_per_rpc)
1835                         pages_per_brw = cli->cl_max_pages_per_rpc;
1836                 else
1837                         pages_per_brw = page_count;
1838
1839                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1840
1841                 if (saved_oa != NULL) {
1842                         /* restore previously saved oa */
1843                         *oinfo->oi_oa = *saved_oa;
1844                 } else if (page_count > pages_per_brw) {
1845                         /* save a copy of oa (brw will clobber it) */
1846                         OBDO_ALLOC(saved_oa);
1847                         if (saved_oa == NULL)
1848                                 GOTO(out, rc = -ENOMEM);
1849                         *saved_oa = *oinfo->oi_oa;
1850                 }
1851
1852                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1853                                       pages_per_brw, ppga, oinfo->oi_capa);
1854
1855                 if (rc != 0)
1856                         break;
1857
1858                 page_count -= pages_per_brw;
1859                 ppga += pages_per_brw;
1860         }
1861
1862 out:
1863         osc_release_ppga(orig, page_count_orig);
1864
1865         if (saved_oa != NULL)
1866                 OBDO_FREE(saved_oa);
1867
1868         RETURN(rc);
1869 }
1870
1871 static int brw_interpret(const struct lu_env *env,
1872                          struct ptlrpc_request *req, void *data, int rc)
1873 {
1874         struct osc_brw_async_args *aa = data;
1875         struct osc_extent *ext;
1876         struct osc_extent *tmp;
1877         struct cl_object  *obj = NULL;
1878         struct client_obd *cli = aa->aa_cli;
1879         ENTRY;
1880
1881         rc = osc_brw_fini_request(req, rc);
1882         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1883         /* When server return -EINPROGRESS, client should always retry
1884          * regardless of the number of times the bulk was resent already. */
1885         if (osc_recoverable_error(rc)) {
1886                 if (req->rq_import_generation !=
1887                     req->rq_import->imp_generation) {
1888                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1889                                ""LPU64":"LPU64", rc = %d.\n",
1890                                req->rq_import->imp_obd->obd_name,
1891                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1892                 } else if (rc == -EINPROGRESS ||
1893                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1894                         rc = osc_brw_redo_request(req, aa);
1895                 } else {
1896                         CERROR("%s: too many resent retries for object: "
1897                                ""LPU64":"LPU64", rc = %d.\n",
1898                                req->rq_import->imp_obd->obd_name,
1899                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1900                 }
1901
1902                 if (rc == 0)
1903                         RETURN(0);
1904                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1905                         rc = -EIO;
1906         }
1907
1908         if (aa->aa_ocapa) {
1909                 capa_put(aa->aa_ocapa);
1910                 aa->aa_ocapa = NULL;
1911         }
1912
1913         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1914                 if (obj == NULL && rc == 0) {
1915                         obj = osc2cl(ext->oe_obj);
1916                         cl_object_get(obj);
1917                 }
1918
1919                 cfs_list_del_init(&ext->oe_link);
1920                 osc_extent_finish(env, ext, 1, rc);
1921         }
1922         LASSERT(cfs_list_empty(&aa->aa_exts));
1923         LASSERT(cfs_list_empty(&aa->aa_oaps));
1924
1925         if (obj != NULL) {
1926                 struct obdo *oa = aa->aa_oa;
1927                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1928                 unsigned long valid = 0;
1929
1930                 LASSERT(rc == 0);
1931                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1932                         attr->cat_blocks = oa->o_blocks;
1933                         valid |= CAT_BLOCKS;
1934                 }
1935                 if (oa->o_valid & OBD_MD_FLMTIME) {
1936                         attr->cat_mtime = oa->o_mtime;
1937                         valid |= CAT_MTIME;
1938                 }
1939                 if (oa->o_valid & OBD_MD_FLATIME) {
1940                         attr->cat_atime = oa->o_atime;
1941                         valid |= CAT_ATIME;
1942                 }
1943                 if (oa->o_valid & OBD_MD_FLCTIME) {
1944                         attr->cat_ctime = oa->o_ctime;
1945                         valid |= CAT_CTIME;
1946                 }
1947                 if (valid != 0) {
1948                         cl_object_attr_lock(obj);
1949                         cl_object_attr_set(env, obj, attr, valid);
1950                         cl_object_attr_unlock(obj);
1951                 }
1952                 cl_object_put(env, obj);
1953         }
1954         OBDO_FREE(aa->aa_oa);
1955
1956         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1957                           req->rq_bulk->bd_nob_transferred);
1958         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1959         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1960
1961         client_obd_list_lock(&cli->cl_loi_list_lock);
1962         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1963          * is called so we know whether to go to sync BRWs or wait for more
1964          * RPCs to complete */
1965         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1966                 cli->cl_w_in_flight--;
1967         else
1968                 cli->cl_r_in_flight--;
1969         osc_wake_cache_waiters(cli);
1970         client_obd_list_unlock(&cli->cl_loi_list_lock);
1971
1972         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1973         RETURN(rc);
1974 }
1975
1976 /**
1977  * Build an RPC by the list of extent @ext_list. The caller must ensure
1978  * that the total pages in this list are NOT over max pages per RPC.
1979  * Extents in the list must be in OES_RPC state.
1980  */
1981 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1982                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
1983 {
1984         struct ptlrpc_request *req = NULL;
1985         struct osc_extent *ext;
1986         CFS_LIST_HEAD(rpc_list);
1987         struct brw_page **pga = NULL;
1988         struct osc_brw_async_args *aa = NULL;
1989         struct obdo *oa = NULL;
1990         struct osc_async_page *oap;
1991         struct osc_async_page *tmp;
1992         struct cl_req *clerq = NULL;
1993         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1994         struct ldlm_lock *lock = NULL;
1995         struct cl_req_attr crattr;
1996         obd_off starting_offset = OBD_OBJECT_EOF;
1997         obd_off ending_offset = 0;
1998         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
1999
2000         ENTRY;
2001         LASSERT(!cfs_list_empty(ext_list));
2002
2003         /* add pages into rpc_list to build BRW rpc */
2004         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2005                 LASSERT(ext->oe_state == OES_RPC);
2006                 mem_tight |= ext->oe_memalloc;
2007                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2008                         ++page_count;
2009                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2010                         if (starting_offset > oap->oap_obj_off)
2011                                 starting_offset = oap->oap_obj_off;
2012                         else
2013                                 LASSERT(oap->oap_page_off == 0);
2014                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2015                                 ending_offset = oap->oap_obj_off +
2016                                                 oap->oap_count;
2017                         else
2018                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2019                                         CFS_PAGE_SIZE);
2020                 }
2021         }
2022
2023         if (mem_tight)
2024                 mpflag = cfs_memory_pressure_get_and_set();
2025
2026         memset(&crattr, 0, sizeof crattr);
2027         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2028         if (pga == NULL)
2029                 GOTO(out, rc = -ENOMEM);
2030
2031         OBDO_ALLOC(oa);
2032         if (oa == NULL)
2033                 GOTO(out, rc = -ENOMEM);
2034
2035         i = 0;
2036         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2037                 struct cl_page *page = oap2cl_page(oap);
2038                 if (clerq == NULL) {
2039                         clerq = cl_req_alloc(env, page, crt,
2040                                              1 /* only 1-object rpcs for
2041                                                 * now */);
2042                         if (IS_ERR(clerq))
2043                                 GOTO(out, rc = PTR_ERR(clerq));
2044                         lock = oap->oap_ldlm_lock;
2045                 }
2046                 if (mem_tight)
2047                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2048                 pga[i] = &oap->oap_brw_page;
2049                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2050                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2051                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2052                 i++;
2053                 cl_req_page_add(env, clerq, page);
2054         }
2055
2056         /* always get the data for the obdo for the rpc */
2057         LASSERT(clerq != NULL);
2058         crattr.cra_oa = oa;
2059         crattr.cra_capa = NULL;
2060         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2061         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2062         if (lock) {
2063                 oa->o_handle = lock->l_remote_handle;
2064                 oa->o_valid |= OBD_MD_FLHANDLE;
2065         }
2066
2067         rc = cl_req_prep(env, clerq);
2068         if (rc != 0) {
2069                 CERROR("cl_req_prep failed: %d\n", rc);
2070                 GOTO(out, rc);
2071         }
2072
2073         sort_brw_pages(pga, page_count);
2074         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2075                         pga, &req, crattr.cra_capa, 1, 0);
2076         if (rc != 0) {
2077                 CERROR("prep_req failed: %d\n", rc);
2078                 GOTO(out, rc);
2079         }
2080
2081         req->rq_interpret_reply = brw_interpret;
2082         if (mem_tight != 0)
2083                 req->rq_memalloc = 1;
2084
2085         /* Need to update the timestamps after the request is built in case
2086          * we race with setattr (locally or in queue at OST).  If OST gets
2087          * later setattr before earlier BRW (as determined by the request xid),
2088          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2089          * way to do this in a single call.  bug 10150 */
2090         cl_req_attr_set(env, clerq, &crattr,
2091                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2092
2093         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2094
2095         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2096         aa = ptlrpc_req_async_args(req);
2097         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2098         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2099         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2100         cfs_list_splice_init(ext_list, &aa->aa_exts);
2101         aa->aa_clerq = clerq;
2102
2103         /* queued sync pages can be torn down while the pages
2104          * were between the pending list and the rpc */
2105         tmp = NULL;
2106         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2107                 /* only one oap gets a request reference */
2108                 if (tmp == NULL)
2109                         tmp = oap;
2110                 if (oap->oap_interrupted && !req->rq_intr) {
2111                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2112                                         oap, req);
2113                         ptlrpc_mark_interrupted(req);
2114                 }
2115         }
2116         if (tmp != NULL)
2117                 tmp->oap_request = ptlrpc_request_addref(req);
2118
2119         client_obd_list_lock(&cli->cl_loi_list_lock);
2120         starting_offset >>= CFS_PAGE_SHIFT;
2121         if (cmd == OBD_BRW_READ) {
2122                 cli->cl_r_in_flight++;
2123                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2124                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2125                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2126                                       starting_offset + 1);
2127         } else {
2128                 cli->cl_w_in_flight++;
2129                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2130                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2131                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2132                                       starting_offset + 1);
2133         }
2134         client_obd_list_unlock(&cli->cl_loi_list_lock);
2135
2136         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2137                   page_count, aa, cli->cl_r_in_flight,
2138                   cli->cl_w_in_flight);
2139
2140         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2141          * see which CPU/NUMA node the majority of pages were allocated
2142          * on, and try to assign the async RPC to the CPU core
2143          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2144          *
2145          * But on the other hand, we expect that multiple ptlrpcd
2146          * threads and the initial write sponsor can run in parallel,
2147          * especially when data checksum is enabled, which is CPU-bound
2148          * operation and single ptlrpcd thread cannot process in time.
2149          * So more ptlrpcd threads sharing BRW load
2150          * (with PDL_POLICY_ROUND) seems better.
2151          */
2152         ptlrpcd_add_req(req, pol, -1);
2153         rc = 0;
2154         EXIT;
2155
2156 out:
2157         if (mem_tight != 0)
2158                 cfs_memory_pressure_restore(mpflag);
2159
2160         capa_put(crattr.cra_capa);
2161         if (rc != 0) {
2162                 LASSERT(req == NULL);
2163
2164                 if (oa)
2165                         OBDO_FREE(oa);
2166                 if (pga)
2167                         OBD_FREE(pga, sizeof(*pga) * page_count);
2168                 /* this should happen rarely and is pretty bad, it makes the
2169                  * pending list not follow the dirty order */
2170                 while (!cfs_list_empty(ext_list)) {
2171                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2172                                              oe_link);
2173                         cfs_list_del_init(&ext->oe_link);
2174                         osc_extent_finish(env, ext, 0, rc);
2175                 }
2176                 if (clerq && !IS_ERR(clerq))
2177                         cl_req_completion(env, clerq, rc);
2178         }
2179         RETURN(rc);
2180 }
2181
2182 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2183                                         struct ldlm_enqueue_info *einfo)
2184 {
2185         void *data = einfo->ei_cbdata;
2186         int set = 0;
2187
2188         LASSERT(lock != NULL);
2189         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2190         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2191         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2192         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2193
2194         lock_res_and_lock(lock);
2195         cfs_spin_lock(&osc_ast_guard);
2196
2197         if (lock->l_ast_data == NULL)
2198                 lock->l_ast_data = data;
2199         if (lock->l_ast_data == data)
2200                 set = 1;
2201
2202         cfs_spin_unlock(&osc_ast_guard);
2203         unlock_res_and_lock(lock);
2204
2205         return set;
2206 }
2207
2208 static int osc_set_data_with_check(struct lustre_handle *lockh,
2209                                    struct ldlm_enqueue_info *einfo)
2210 {
2211         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2212         int set = 0;
2213
2214         if (lock != NULL) {
2215                 set = osc_set_lock_data_with_check(lock, einfo);
2216                 LDLM_LOCK_PUT(lock);
2217         } else
2218                 CERROR("lockh %p, data %p - client evicted?\n",
2219                        lockh, einfo->ei_cbdata);
2220         return set;
2221 }
2222
2223 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2224                              ldlm_iterator_t replace, void *data)
2225 {
2226         struct ldlm_res_id res_id;
2227         struct obd_device *obd = class_exp2obd(exp);
2228
2229         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2230         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2231         return 0;
2232 }
2233
2234 /* find any ldlm lock of the inode in osc
2235  * return 0    not find
2236  *        1    find one
2237  *      < 0    error */
2238 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2239                            ldlm_iterator_t replace, void *data)
2240 {
2241         struct ldlm_res_id res_id;
2242         struct obd_device *obd = class_exp2obd(exp);
2243         int rc = 0;
2244
2245         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2246         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2247         if (rc == LDLM_ITER_STOP)
2248                 return(1);
2249         if (rc == LDLM_ITER_CONTINUE)
2250                 return(0);
2251         return(rc);
2252 }
2253
2254 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2255                             obd_enqueue_update_f upcall, void *cookie,
2256                             int *flags, int agl, int rc)
2257 {
2258         int intent = *flags & LDLM_FL_HAS_INTENT;
2259         ENTRY;
2260
2261         if (intent) {
2262                 /* The request was created before ldlm_cli_enqueue call. */
2263                 if (rc == ELDLM_LOCK_ABORTED) {
2264                         struct ldlm_reply *rep;
2265                         rep = req_capsule_server_get(&req->rq_pill,
2266                                                      &RMF_DLM_REP);
2267
2268                         LASSERT(rep != NULL);
2269                         if (rep->lock_policy_res1)
2270                                 rc = rep->lock_policy_res1;
2271                 }
2272         }
2273
2274         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2275             (rc == 0)) {
2276                 *flags |= LDLM_FL_LVB_READY;
2277                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2278                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2279         }
2280
2281         /* Call the update callback. */
2282         rc = (*upcall)(cookie, rc);
2283         RETURN(rc);
2284 }
2285
2286 static int osc_enqueue_interpret(const struct lu_env *env,
2287                                  struct ptlrpc_request *req,
2288                                  struct osc_enqueue_args *aa, int rc)
2289 {
2290         struct ldlm_lock *lock;
2291         struct lustre_handle handle;
2292         __u32 mode;
2293         struct ost_lvb *lvb;
2294         __u32 lvb_len;
2295         int *flags = aa->oa_flags;
2296
2297         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2298          * might be freed anytime after lock upcall has been called. */
2299         lustre_handle_copy(&handle, aa->oa_lockh);
2300         mode = aa->oa_ei->ei_mode;
2301
2302         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2303          * be valid. */
2304         lock = ldlm_handle2lock(&handle);
2305
2306         /* Take an additional reference so that a blocking AST that
2307          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2308          * to arrive after an upcall has been executed by
2309          * osc_enqueue_fini(). */
2310         ldlm_lock_addref(&handle, mode);
2311
2312         /* Let CP AST to grant the lock first. */
2313         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2314
2315         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2316                 lvb = NULL;
2317                 lvb_len = 0;
2318         } else {
2319                 lvb = aa->oa_lvb;
2320                 lvb_len = sizeof(*aa->oa_lvb);
2321         }
2322
2323         /* Complete obtaining the lock procedure. */
2324         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2325                                    mode, flags, lvb, lvb_len, &handle, rc);
2326         /* Complete osc stuff. */
2327         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2328                               flags, aa->oa_agl, rc);
2329
2330         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2331
2332         /* Release the lock for async request. */
2333         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2334                 /*
2335                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2336                  * not already released by
2337                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2338                  */
2339                 ldlm_lock_decref(&handle, mode);
2340
2341         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2342                  aa->oa_lockh, req, aa);
2343         ldlm_lock_decref(&handle, mode);
2344         LDLM_LOCK_PUT(lock);
2345         return rc;
2346 }
2347
2348 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2349                         struct lov_oinfo *loi, int flags,
2350                         struct ost_lvb *lvb, __u32 mode, int rc)
2351 {
2352         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2353
2354         if (rc == ELDLM_OK) {
2355                 __u64 tmp;
2356
2357                 LASSERT(lock != NULL);
2358                 loi->loi_lvb = *lvb;
2359                 tmp = loi->loi_lvb.lvb_size;
2360                 /* Extend KMS up to the end of this lock and no further
2361                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2362                 if (tmp > lock->l_policy_data.l_extent.end)
2363                         tmp = lock->l_policy_data.l_extent.end + 1;
2364                 if (tmp >= loi->loi_kms) {
2365                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2366                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2367                         loi_kms_set(loi, tmp);
2368                 } else {
2369                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2370                                    LPU64"; leaving kms="LPU64", end="LPU64,
2371                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2372                                    lock->l_policy_data.l_extent.end);
2373                 }
2374                 ldlm_lock_allow_match(lock);
2375         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2376                 LASSERT(lock != NULL);
2377                 loi->loi_lvb = *lvb;
2378                 ldlm_lock_allow_match(lock);
2379                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2380                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2381                 rc = ELDLM_OK;
2382         }
2383
2384         if (lock != NULL) {
2385                 if (rc != ELDLM_OK)
2386                         ldlm_lock_fail_match(lock);
2387
2388                 LDLM_LOCK_PUT(lock);
2389         }
2390 }
2391 EXPORT_SYMBOL(osc_update_enqueue);
2392
2393 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2394
2395 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2396  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2397  * other synchronous requests, however keeping some locks and trying to obtain
2398  * others may take a considerable amount of time in a case of ost failure; and
2399  * when other sync requests do not get released lock from a client, the client
2400  * is excluded from the cluster -- such scenarious make the life difficult, so
2401  * release locks just after they are obtained. */
2402 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2403                      int *flags, ldlm_policy_data_t *policy,
2404                      struct ost_lvb *lvb, int kms_valid,
2405                      obd_enqueue_update_f upcall, void *cookie,
2406                      struct ldlm_enqueue_info *einfo,
2407                      struct lustre_handle *lockh,
2408                      struct ptlrpc_request_set *rqset, int async, int agl)
2409 {
2410         struct obd_device *obd = exp->exp_obd;
2411         struct ptlrpc_request *req = NULL;
2412         int intent = *flags & LDLM_FL_HAS_INTENT;
2413         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2414         ldlm_mode_t mode;
2415         int rc;
2416         ENTRY;
2417
2418         /* Filesystem lock extents are extended to page boundaries so that
2419          * dealing with the page cache is a little smoother.  */
2420         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2421         policy->l_extent.end |= ~CFS_PAGE_MASK;
2422
2423         /*
2424          * kms is not valid when either object is completely fresh (so that no
2425          * locks are cached), or object was evicted. In the latter case cached
2426          * lock cannot be used, because it would prime inode state with
2427          * potentially stale LVB.
2428          */
2429         if (!kms_valid)
2430                 goto no_match;
2431
2432         /* Next, search for already existing extent locks that will cover us */
2433         /* If we're trying to read, we also search for an existing PW lock.  The
2434          * VFS and page cache already protect us locally, so lots of readers/
2435          * writers can share a single PW lock.
2436          *
2437          * There are problems with conversion deadlocks, so instead of
2438          * converting a read lock to a write lock, we'll just enqueue a new
2439          * one.
2440          *
2441          * At some point we should cancel the read lock instead of making them
2442          * send us a blocking callback, but there are problems with canceling
2443          * locks out from other users right now, too. */
2444         mode = einfo->ei_mode;
2445         if (einfo->ei_mode == LCK_PR)
2446                 mode |= LCK_PW;
2447         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2448                                einfo->ei_type, policy, mode, lockh, 0);
2449         if (mode) {
2450                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2451
2452                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2453                         /* For AGL, if enqueue RPC is sent but the lock is not
2454                          * granted, then skip to process this strpe.
2455                          * Return -ECANCELED to tell the caller. */
2456                         ldlm_lock_decref(lockh, mode);
2457                         LDLM_LOCK_PUT(matched);
2458                         RETURN(-ECANCELED);
2459                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2460                         *flags |= LDLM_FL_LVB_READY;
2461                         /* addref the lock only if not async requests and PW
2462                          * lock is matched whereas we asked for PR. */
2463                         if (!rqset && einfo->ei_mode != mode)
2464                                 ldlm_lock_addref(lockh, LCK_PR);
2465                         if (intent) {
2466                                 /* I would like to be able to ASSERT here that
2467                                  * rss <= kms, but I can't, for reasons which
2468                                  * are explained in lov_enqueue() */
2469                         }
2470
2471                         /* We already have a lock, and it's referenced */
2472                         (*upcall)(cookie, ELDLM_OK);
2473
2474                         if (einfo->ei_mode != mode)
2475                                 ldlm_lock_decref(lockh, LCK_PW);
2476                         else if (rqset)
2477                                 /* For async requests, decref the lock. */
2478                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2479                         LDLM_LOCK_PUT(matched);
2480                         RETURN(ELDLM_OK);
2481                 } else {
2482                         ldlm_lock_decref(lockh, mode);
2483                         LDLM_LOCK_PUT(matched);
2484                 }
2485         }
2486
2487  no_match:
2488         if (intent) {
2489                 CFS_LIST_HEAD(cancels);
2490                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2491                                            &RQF_LDLM_ENQUEUE_LVB);
2492                 if (req == NULL)
2493                         RETURN(-ENOMEM);
2494
2495                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2496                 if (rc) {
2497                         ptlrpc_request_free(req);
2498                         RETURN(rc);
2499                 }
2500
2501                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2502                                      sizeof *lvb);
2503                 ptlrpc_request_set_replen(req);
2504         }
2505
2506         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2507         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2508
2509         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2510                               sizeof(*lvb), lockh, async);
2511         if (rqset) {
2512                 if (!rc) {
2513                         struct osc_enqueue_args *aa;
2514                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2515                         aa = ptlrpc_req_async_args(req);
2516                         aa->oa_ei = einfo;
2517                         aa->oa_exp = exp;
2518                         aa->oa_flags  = flags;
2519                         aa->oa_upcall = upcall;
2520                         aa->oa_cookie = cookie;
2521                         aa->oa_lvb    = lvb;
2522                         aa->oa_lockh  = lockh;
2523                         aa->oa_agl    = !!agl;
2524
2525                         req->rq_interpret_reply =
2526                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2527                         if (rqset == PTLRPCD_SET)
2528                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2529                         else
2530                                 ptlrpc_set_add_req(rqset, req);
2531                 } else if (intent) {
2532                         ptlrpc_req_finished(req);
2533                 }
2534                 RETURN(rc);
2535         }
2536
2537         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2538         if (intent)
2539                 ptlrpc_req_finished(req);
2540
2541         RETURN(rc);
2542 }
2543
2544 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2545                        struct ldlm_enqueue_info *einfo,
2546                        struct ptlrpc_request_set *rqset)
2547 {
2548         struct ldlm_res_id res_id;
2549         int rc;
2550         ENTRY;
2551
2552         osc_build_res_name(oinfo->oi_md->lsm_object_id,
2553                            oinfo->oi_md->lsm_object_seq, &res_id);
2554
2555         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2556                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2557                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2558                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2559                               rqset, rqset != NULL, 0);
2560         RETURN(rc);
2561 }
2562
2563 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2564                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2565                    int *flags, void *data, struct lustre_handle *lockh,
2566                    int unref)
2567 {
2568         struct obd_device *obd = exp->exp_obd;
2569         int lflags = *flags;
2570         ldlm_mode_t rc;
2571         ENTRY;
2572
2573         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2574                 RETURN(-EIO);
2575
2576         /* Filesystem lock extents are extended to page boundaries so that
2577          * dealing with the page cache is a little smoother */
2578         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2579         policy->l_extent.end |= ~CFS_PAGE_MASK;
2580
2581         /* Next, search for already existing extent locks that will cover us */
2582         /* If we're trying to read, we also search for an existing PW lock.  The
2583          * VFS and page cache already protect us locally, so lots of readers/
2584          * writers can share a single PW lock. */
2585         rc = mode;
2586         if (mode == LCK_PR)
2587                 rc |= LCK_PW;
2588         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2589                              res_id, type, policy, rc, lockh, unref);
2590         if (rc) {
2591                 if (data != NULL) {
2592                         if (!osc_set_data_with_check(lockh, data)) {
2593                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2594                                         ldlm_lock_decref(lockh, rc);
2595                                 RETURN(0);
2596                         }
2597                 }
2598                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2599                         ldlm_lock_addref(lockh, LCK_PR);
2600                         ldlm_lock_decref(lockh, LCK_PW);
2601                 }
2602                 RETURN(rc);
2603         }
2604         RETURN(rc);
2605 }
2606
2607 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2608 {
2609         ENTRY;
2610
2611         if (unlikely(mode == LCK_GROUP))
2612                 ldlm_lock_decref_and_cancel(lockh, mode);
2613         else
2614                 ldlm_lock_decref(lockh, mode);
2615
2616         RETURN(0);
2617 }
2618
2619 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2620                       __u32 mode, struct lustre_handle *lockh)
2621 {
2622         ENTRY;
2623         RETURN(osc_cancel_base(lockh, mode));
2624 }
2625
2626 static int osc_cancel_unused(struct obd_export *exp,
2627                              struct lov_stripe_md *lsm,
2628                              ldlm_cancel_flags_t flags,
2629                              void *opaque)
2630 {
2631         struct obd_device *obd = class_exp2obd(exp);
2632         struct ldlm_res_id res_id, *resp = NULL;
2633
2634         if (lsm != NULL) {
2635                 resp = osc_build_res_name(lsm->lsm_object_id,
2636                                           lsm->lsm_object_seq, &res_id);
2637         }
2638
2639         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2640 }
2641
2642 static int osc_statfs_interpret(const struct lu_env *env,
2643                                 struct ptlrpc_request *req,
2644                                 struct osc_async_args *aa, int rc)
2645 {
2646         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
2647         struct obd_statfs *msfs;
2648         __u64 used;
2649         ENTRY;
2650
2651         if (rc == -EBADR)
2652                 /* The request has in fact never been sent
2653                  * due to issues at a higher level (LOV).
2654                  * Exit immediately since the caller is
2655                  * aware of the problem and takes care
2656                  * of the clean up */
2657                  RETURN(rc);
2658
2659         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2660             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2661                 GOTO(out, rc = 0);
2662
2663         if (rc != 0)
2664                 GOTO(out, rc);
2665
2666         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2667         if (msfs == NULL) {
2668                 GOTO(out, rc = -EPROTO);
2669         }
2670
2671         /* Reinitialize the RDONLY and DEGRADED flags at the client
2672          * on each statfs, so they don't stay set permanently. */
2673         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
2674
2675         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
2676                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
2677         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
2678                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
2679
2680         if (unlikely(msfs->os_state & OS_STATE_READONLY))
2681                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
2682         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
2683                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
2684
2685         /* Add a bit of hysteresis so this flag isn't continually flapping,
2686          * and ensure that new files don't get extremely fragmented due to
2687          * only a small amount of available space in the filesystem.
2688          * We want to set the NOSPC flag when there is less than ~0.1% free
2689          * and clear it when there is at least ~0.2% free space, so:
2690          *                   avail < ~0.1% max          max = avail + used
2691          *            1025 * avail < avail + used       used = blocks - free
2692          *            1024 * avail < used
2693          *            1024 * avail < blocks - free
2694          *                   avail < ((blocks - free) >> 10)
2695          *
2696          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
2697          * lose that amount of space so in those cases we report no space left
2698          * if their is less than 1 GB left.                             */
2699         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
2700         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
2701                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
2702                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
2703         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2704                           (msfs->os_ffree > 64) &&
2705                           (msfs->os_bavail > (used << 1)))) {
2706                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
2707                                              OSCC_FLAG_NOSPC_BLK);
2708         }
2709
2710         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2711                      (msfs->os_bavail < used)))
2712                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
2713
2714         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
2715
2716         *aa->aa_oi->oi_osfs = *msfs;
2717 out:
2718         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2719         RETURN(rc);
2720 }
2721
2722 static int osc_statfs_async(struct obd_export *exp,
2723                             struct obd_info *oinfo, __u64 max_age,
2724                             struct ptlrpc_request_set *rqset)
2725 {
2726         struct obd_device     *obd = class_exp2obd(exp);
2727         struct ptlrpc_request *req;
2728         struct osc_async_args *aa;
2729         int                    rc;
2730         ENTRY;
2731
2732         /* We could possibly pass max_age in the request (as an absolute
2733          * timestamp or a "seconds.usec ago") so the target can avoid doing
2734          * extra calls into the filesystem if that isn't necessary (e.g.
2735          * during mount that would help a bit).  Having relative timestamps
2736          * is not so great if request processing is slow, while absolute
2737          * timestamps are not ideal because they need time synchronization. */
2738         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2739         if (req == NULL)
2740                 RETURN(-ENOMEM);
2741
2742         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2743         if (rc) {
2744                 ptlrpc_request_free(req);
2745                 RETURN(rc);
2746         }
2747         ptlrpc_request_set_replen(req);
2748         req->rq_request_portal = OST_CREATE_PORTAL;
2749         ptlrpc_at_set_req_timeout(req);
2750
2751         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2752                 /* procfs requests not want stat in wait for avoid deadlock */
2753                 req->rq_no_resend = 1;
2754                 req->rq_no_delay = 1;
2755         }
2756
2757         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2758         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2759         aa = ptlrpc_req_async_args(req);
2760         aa->aa_oi = oinfo;
2761
2762         ptlrpc_set_add_req(rqset, req);
2763         RETURN(0);
2764 }
2765
2766 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2767                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2768 {
2769         struct obd_device     *obd = class_exp2obd(exp);
2770         struct obd_statfs     *msfs;
2771         struct ptlrpc_request *req;
2772         struct obd_import     *imp = NULL;
2773         int rc;
2774         ENTRY;
2775
2776         /*Since the request might also come from lprocfs, so we need
2777          *sync this with client_disconnect_export Bug15684*/
2778         cfs_down_read(&obd->u.cli.cl_sem);
2779         if (obd->u.cli.cl_import)
2780                 imp = class_import_get(obd->u.cli.cl_import);
2781         cfs_up_read(&obd->u.cli.cl_sem);
2782         if (!imp)
2783                 RETURN(-ENODEV);
2784
2785         /* We could possibly pass max_age in the request (as an absolute
2786          * timestamp or a "seconds.usec ago") so the target can avoid doing
2787          * extra calls into the filesystem if that isn't necessary (e.g.
2788          * during mount that would help a bit).  Having relative timestamps
2789          * is not so great if request processing is slow, while absolute
2790          * timestamps are not ideal because they need time synchronization. */
2791         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2792
2793         class_import_put(imp);
2794
2795         if (req == NULL)
2796                 RETURN(-ENOMEM);
2797
2798         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2799         if (rc) {
2800                 ptlrpc_request_free(req);
2801                 RETURN(rc);
2802         }
2803         ptlrpc_request_set_replen(req);
2804         req->rq_request_portal = OST_CREATE_PORTAL;
2805         ptlrpc_at_set_req_timeout(req);
2806
2807         if (flags & OBD_STATFS_NODELAY) {
2808                 /* procfs requests not want stat in wait for avoid deadlock */
2809                 req->rq_no_resend = 1;
2810                 req->rq_no_delay = 1;
2811         }
2812
2813         rc = ptlrpc_queue_wait(req);
2814         if (rc)
2815                 GOTO(out, rc);
2816
2817         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2818         if (msfs == NULL) {
2819                 GOTO(out, rc = -EPROTO);
2820         }
2821
2822         *osfs = *msfs;
2823
2824         EXIT;
2825  out:
2826         ptlrpc_req_finished(req);
2827         return rc;
2828 }
2829
2830 /* Retrieve object striping information.
2831  *
2832  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2833  * the maximum number of OST indices which will fit in the user buffer.
2834  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2835  */
2836 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2837 {
2838         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2839         struct lov_user_md_v3 lum, *lumk;
2840         struct lov_user_ost_data_v1 *lmm_objects;
2841         int rc = 0, lum_size;
2842         ENTRY;
2843
2844         if (!lsm)
2845                 RETURN(-ENODATA);
2846
2847         /* we only need the header part from user space to get lmm_magic and
2848          * lmm_stripe_count, (the header part is common to v1 and v3) */
2849         lum_size = sizeof(struct lov_user_md_v1);
2850         if (cfs_copy_from_user(&lum, lump, lum_size))
2851                 RETURN(-EFAULT);
2852
2853         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2854             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2855                 RETURN(-EINVAL);
2856
2857         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2858         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2859         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2860         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2861
2862         /* we can use lov_mds_md_size() to compute lum_size
2863          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2864         if (lum.lmm_stripe_count > 0) {
2865                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2866                 OBD_ALLOC(lumk, lum_size);
2867                 if (!lumk)
2868                         RETURN(-ENOMEM);
2869
2870                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2871                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2872                 else
2873                         lmm_objects = &(lumk->lmm_objects[0]);
2874                 lmm_objects->l_object_id = lsm->lsm_object_id;
2875         } else {
2876                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2877                 lumk = &lum;
2878         }
2879
2880         lumk->lmm_object_id = lsm->lsm_object_id;
2881         lumk->lmm_object_seq = lsm->lsm_object_seq;
2882         lumk->lmm_stripe_count = 1;
2883
2884         if (cfs_copy_to_user(lump, lumk, lum_size))
2885                 rc = -EFAULT;
2886
2887         if (lumk != &lum)
2888                 OBD_FREE(lumk, lum_size);
2889
2890         RETURN(rc);
2891 }
2892
2893
2894 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2895                          void *karg, void *uarg)
2896 {
2897         struct obd_device *obd = exp->exp_obd;
2898         struct obd_ioctl_data *data = karg;
2899         int err = 0;
2900         ENTRY;
2901
2902         if (!cfs_try_module_get(THIS_MODULE)) {
2903                 CERROR("Can't get module. Is it alive?");
2904                 return -EINVAL;
2905         }
2906         switch (cmd) {
2907         case OBD_IOC_LOV_GET_CONFIG: {
2908                 char *buf;
2909                 struct lov_desc *desc;
2910                 struct obd_uuid uuid;
2911
2912                 buf = NULL;
2913                 len = 0;
2914                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2915                         GOTO(out, err = -EINVAL);
2916
2917                 data = (struct obd_ioctl_data *)buf;
2918
2919                 if (sizeof(*desc) > data->ioc_inllen1) {
2920                         obd_ioctl_freedata(buf, len);
2921                         GOTO(out, err = -EINVAL);
2922                 }
2923
2924                 if (data->ioc_inllen2 < sizeof(uuid)) {
2925                         obd_ioctl_freedata(buf, len);
2926                         GOTO(out, err = -EINVAL);
2927                 }
2928
2929                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2930                 desc->ld_tgt_count = 1;
2931                 desc->ld_active_tgt_count = 1;
2932                 desc->ld_default_stripe_count = 1;
2933                 desc->ld_default_stripe_size = 0;
2934                 desc->ld_default_stripe_offset = 0;
2935                 desc->ld_pattern = 0;
2936                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2937
2938                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2939
2940                 err = cfs_copy_to_user((void *)uarg, buf, len);
2941                 if (err)
2942                         err = -EFAULT;
2943                 obd_ioctl_freedata(buf, len);
2944                 GOTO(out, err);
2945         }
2946         case LL_IOC_LOV_SETSTRIPE:
2947                 err = obd_alloc_memmd(exp, karg);
2948                 if (err > 0)
2949                         err = 0;
2950                 GOTO(out, err);
2951         case LL_IOC_LOV_GETSTRIPE:
2952                 err = osc_getstripe(karg, uarg);
2953                 GOTO(out, err);
2954         case OBD_IOC_CLIENT_RECOVER:
2955                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2956                                             data->ioc_inlbuf1, 0);
2957                 if (err > 0)
2958                         err = 0;
2959                 GOTO(out, err);
2960         case IOC_OSC_SET_ACTIVE:
2961                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2962                                                data->ioc_offset);
2963                 GOTO(out, err);
2964         case OBD_IOC_POLL_QUOTACHECK:
2965                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2966                 GOTO(out, err);
2967         case OBD_IOC_PING_TARGET:
2968                 err = ptlrpc_obd_ping(obd);
2969                 GOTO(out, err);
2970         default:
2971                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2972                        cmd, cfs_curproc_comm());
2973                 GOTO(out, err = -ENOTTY);
2974         }
2975 out:
2976         cfs_module_put(THIS_MODULE);
2977         return err;
2978 }
2979
2980 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2981                         obd_count keylen, void *key, __u32 *vallen, void *val,
2982                         struct lov_stripe_md *lsm)
2983 {
2984         ENTRY;
2985         if (!vallen || !val)
2986                 RETURN(-EFAULT);
2987
2988         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2989                 __u32 *stripe = val;
2990                 *vallen = sizeof(*stripe);
2991                 *stripe = 0;
2992                 RETURN(0);
2993         } else if (KEY_IS(KEY_LAST_ID)) {
2994                 struct ptlrpc_request *req;
2995                 obd_id                *reply;
2996                 char                  *tmp;
2997                 int                    rc;
2998
2999                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3000                                            &RQF_OST_GET_INFO_LAST_ID);
3001                 if (req == NULL)
3002                         RETURN(-ENOMEM);
3003
3004                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3005                                      RCL_CLIENT, keylen);
3006                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3007                 if (rc) {
3008                         ptlrpc_request_free(req);
3009                         RETURN(rc);
3010                 }
3011
3012                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3013                 memcpy(tmp, key, keylen);
3014
3015                 req->rq_no_delay = req->rq_no_resend = 1;
3016                 ptlrpc_request_set_replen(req);
3017                 rc = ptlrpc_queue_wait(req);
3018                 if (rc)
3019                         GOTO(out, rc);
3020
3021                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3022                 if (reply == NULL)
3023                         GOTO(out, rc = -EPROTO);
3024
3025                 *((obd_id *)val) = *reply;
3026         out:
3027                 ptlrpc_req_finished(req);
3028                 RETURN(rc);
3029         } else if (KEY_IS(KEY_FIEMAP)) {
3030                 struct ptlrpc_request *req;
3031                 struct ll_user_fiemap *reply;
3032                 char *tmp;
3033                 int rc;
3034
3035                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3036                                            &RQF_OST_GET_INFO_FIEMAP);
3037                 if (req == NULL)
3038                         RETURN(-ENOMEM);
3039
3040                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3041                                      RCL_CLIENT, keylen);
3042                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3043                                      RCL_CLIENT, *vallen);
3044                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3045                                      RCL_SERVER, *vallen);
3046
3047                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3048                 if (rc) {
3049                         ptlrpc_request_free(req);
3050                         RETURN(rc);
3051                 }
3052
3053                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3054                 memcpy(tmp, key, keylen);
3055                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3056                 memcpy(tmp, val, *vallen);
3057
3058                 ptlrpc_request_set_replen(req);
3059                 rc = ptlrpc_queue_wait(req);
3060                 if (rc)
3061                         GOTO(out1, rc);
3062
3063                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3064                 if (reply == NULL)
3065                         GOTO(out1, rc = -EPROTO);
3066
3067                 memcpy(val, reply, *vallen);
3068         out1:
3069                 ptlrpc_req_finished(req);
3070
3071                 RETURN(rc);
3072         }
3073
3074         RETURN(-EINVAL);
3075 }
3076
3077 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3078 {
3079         struct llog_ctxt *ctxt;
3080         int rc = 0;
3081         ENTRY;
3082
3083         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3084         if (ctxt) {
3085                 rc = llog_initiator_connect(ctxt);
3086                 llog_ctxt_put(ctxt);
3087         } else {
3088                 /* XXX return an error? skip setting below flags? */
3089         }
3090
3091         cfs_spin_lock(&imp->imp_lock);
3092         imp->imp_server_timeout = 1;
3093         imp->imp_pingable = 1;
3094         cfs_spin_unlock(&imp->imp_lock);
3095         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3096
3097         RETURN(rc);
3098 }
3099
3100 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3101                                           struct ptlrpc_request *req,
3102                                           void *aa, int rc)
3103 {
3104         ENTRY;
3105         if (rc != 0)
3106                 RETURN(rc);
3107
3108         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3109 }
3110
3111 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3112                               obd_count keylen, void *key, obd_count vallen,
3113                               void *val, struct ptlrpc_request_set *set)
3114 {
3115         struct ptlrpc_request *req;
3116         struct obd_device     *obd = exp->exp_obd;
3117         struct obd_import     *imp = class_exp2cliimp(exp);
3118         char                  *tmp;
3119         int                    rc;
3120         ENTRY;
3121
3122         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3123
3124         if (KEY_IS(KEY_NEXT_ID)) {
3125                 obd_id new_val;
3126                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3127
3128                 if (vallen != sizeof(obd_id))
3129                         RETURN(-ERANGE);
3130                 if (val == NULL)
3131                         RETURN(-EINVAL);
3132
3133                 if (vallen != sizeof(obd_id))
3134                         RETURN(-EINVAL);
3135
3136                 /* avoid race between allocate new object and set next id
3137                  * from ll_sync thread */
3138                 cfs_spin_lock(&oscc->oscc_lock);
3139                 new_val = *((obd_id*)val) + 1;
3140                 if (new_val > oscc->oscc_next_id)
3141                         oscc->oscc_next_id = new_val;
3142                 cfs_spin_unlock(&oscc->oscc_lock);
3143                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3144                        exp->exp_obd->obd_name,
3145                        obd->u.cli.cl_oscc.oscc_next_id);
3146
3147                 RETURN(0);
3148         }
3149
3150         if (KEY_IS(KEY_CHECKSUM)) {
3151                 if (vallen != sizeof(int))
3152                         RETURN(-EINVAL);
3153                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3154                 RETURN(0);
3155         }
3156
3157         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3158                 sptlrpc_conf_client_adapt(obd);
3159                 RETURN(0);
3160         }
3161
3162         if (KEY_IS(KEY_FLUSH_CTX)) {
3163                 sptlrpc_import_flush_my_ctx(imp);
3164                 RETURN(0);
3165         }
3166
3167         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3168                 RETURN(-EINVAL);
3169
3170         /* We pass all other commands directly to OST. Since nobody calls osc
3171            methods directly and everybody is supposed to go through LOV, we
3172            assume lov checked invalid values for us.
3173            The only recognised values so far are evict_by_nid and mds_conn.
3174            Even if something bad goes through, we'd get a -EINVAL from OST
3175            anyway. */
3176
3177         if (KEY_IS(KEY_GRANT_SHRINK))
3178                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3179         else
3180                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3181
3182         if (req == NULL)
3183                 RETURN(-ENOMEM);
3184
3185         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3186                              RCL_CLIENT, keylen);
3187         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3188                              RCL_CLIENT, vallen);
3189         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3190         if (rc) {
3191                 ptlrpc_request_free(req);
3192                 RETURN(rc);
3193         }
3194
3195         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3196         memcpy(tmp, key, keylen);
3197         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3198         memcpy(tmp, val, vallen);
3199
3200         if (KEY_IS(KEY_MDS_CONN)) {
3201                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3202
3203                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
3204                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3205                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
3206                 req->rq_no_delay = req->rq_no_resend = 1;
3207                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3208         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3209                 struct osc_grant_args *aa;
3210                 struct obdo *oa;
3211
3212                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3213                 aa = ptlrpc_req_async_args(req);
3214                 OBDO_ALLOC(oa);
3215                 if (!oa) {
3216                         ptlrpc_req_finished(req);
3217                         RETURN(-ENOMEM);
3218                 }
3219                 *oa = ((struct ost_body *)val)->oa;
3220                 aa->aa_oa = oa;
3221                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3222         }
3223
3224         ptlrpc_request_set_replen(req);
3225         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3226                 LASSERT(set != NULL);
3227                 ptlrpc_set_add_req(set, req);
3228                 ptlrpc_check_set(NULL, set);
3229         } else
3230                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3231
3232         RETURN(0);
3233 }
3234
3235
3236 static struct llog_operations osc_size_repl_logops = {
3237         lop_cancel: llog_obd_repl_cancel
3238 };
3239
3240 static struct llog_operations osc_mds_ost_orig_logops;
3241
3242 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3243                            struct obd_device *tgt, struct llog_catid *catid)
3244 {
3245         int rc;
3246         ENTRY;
3247
3248         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
3249                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3250         if (rc) {
3251                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3252                 GOTO(out, rc);
3253         }
3254
3255         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
3256                         NULL, &osc_size_repl_logops);
3257         if (rc) {
3258                 struct llog_ctxt *ctxt =
3259                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3260                 if (ctxt)
3261                         llog_cleanup(ctxt);
3262                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3263         }
3264         GOTO(out, rc);
3265 out:
3266         if (rc) {
3267                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
3268                        obd->obd_name, tgt->obd_name, catid, rc);
3269                 CERROR("logid "LPX64":0x%x\n",
3270                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3271         }
3272         return rc;
3273 }
3274
3275 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3276                          struct obd_device *disk_obd, int *index)
3277 {
3278         struct llog_catid catid;
3279         static char name[32] = CATLIST;
3280         int rc;
3281         ENTRY;
3282
3283         LASSERT(olg == &obd->obd_olg);
3284
3285         cfs_mutex_lock(&olg->olg_cat_processing);
3286         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
3287         if (rc) {
3288                 CERROR("rc: %d\n", rc);
3289                 GOTO(out, rc);
3290         }
3291
3292         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
3293                obd->obd_name, *index, catid.lci_logid.lgl_oid,
3294                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
3295
3296         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
3297         if (rc) {
3298                 CERROR("rc: %d\n", rc);
3299                 GOTO(out, rc);
3300         }
3301
3302         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
3303         if (rc) {
3304                 CERROR("rc: %d\n", rc);
3305                 GOTO(out, rc);
3306         }
3307
3308  out:
3309         cfs_mutex_unlock(&olg->olg_cat_processing);
3310
3311         return rc;
3312 }
3313
3314 static int osc_llog_finish(struct obd_device *obd, int count)
3315 {
3316         struct llog_ctxt *ctxt;
3317         int rc = 0, rc2 = 0;
3318         ENTRY;
3319
3320         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3321         if (ctxt)
3322                 rc = llog_cleanup(ctxt);
3323
3324         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3325         if (ctxt)
3326                 rc2 = llog_cleanup(ctxt);
3327         if (!rc)
3328                 rc = rc2;
3329
3330         RETURN(rc);
3331 }
3332
3333 static int osc_reconnect(const struct lu_env *env,
3334                          struct obd_export *exp, struct obd_device *obd,
3335                          struct obd_uuid *cluuid,
3336                          struct obd_connect_data *data,
3337                          void *localdata)
3338 {
3339         struct client_obd *cli = &obd->u.cli;
3340
3341         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3342                 long lost_grant;
3343
3344                 client_obd_list_lock(&cli->cl_loi_list_lock);
3345                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3346                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3347                 lost_grant = cli->cl_lost_grant;
3348                 cli->cl_lost_grant = 0;
3349                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3350
3351                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3352                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3353                        data->ocd_version, data->ocd_grant, lost_grant);
3354         }
3355
3356         RETURN(0);
3357 }
3358
3359 static int osc_disconnect(struct obd_export *exp)
3360 {
3361         struct obd_device *obd = class_exp2obd(exp);
3362         struct llog_ctxt  *ctxt;
3363         int rc;
3364
3365         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3366         if (ctxt) {
3367                 if (obd->u.cli.cl_conn_count == 1) {
3368                         /* Flush any remaining cancel messages out to the
3369                          * target */
3370                         llog_sync(ctxt, exp, 0);
3371                 }
3372                 llog_ctxt_put(ctxt);
3373         } else {
3374                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3375                        obd);
3376         }
3377
3378         rc = client_disconnect_export(exp);
3379         /**
3380          * Initially we put del_shrink_grant before disconnect_export, but it
3381          * causes the following problem if setup (connect) and cleanup
3382          * (disconnect) are tangled together.
3383          *      connect p1                     disconnect p2
3384          *   ptlrpc_connect_import
3385          *     ...............               class_manual_cleanup
3386          *                                     osc_disconnect
3387          *                                     del_shrink_grant
3388          *   ptlrpc_connect_interrupt
3389          *     init_grant_shrink
3390          *   add this client to shrink list
3391          *                                      cleanup_osc
3392          * Bang! pinger trigger the shrink.
3393          * So the osc should be disconnected from the shrink list, after we
3394          * are sure the import has been destroyed. BUG18662
3395          */
3396         if (obd->u.cli.cl_import == NULL)
3397                 osc_del_shrink_grant(&obd->u.cli);
3398         return rc;
3399 }
3400
3401 static int osc_import_event(struct obd_device *obd,
3402                             struct obd_import *imp,
3403                             enum obd_import_event event)
3404 {
3405         struct client_obd *cli;
3406         int rc = 0;
3407
3408         ENTRY;
3409         LASSERT(imp->imp_obd == obd);
3410
3411         switch (event) {
3412         case IMP_EVENT_DISCON: {
3413                 /* Only do this on the MDS OSC's */
3414                 if (imp->imp_server_timeout) {
3415                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3416
3417                         cfs_spin_lock(&oscc->oscc_lock);
3418                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3419                         cfs_spin_unlock(&oscc->oscc_lock);
3420                 }
3421                 cli = &obd->u.cli;
3422                 client_obd_list_lock(&cli->cl_loi_list_lock);
3423                 cli->cl_avail_grant = 0;
3424                 cli->cl_lost_grant = 0;
3425                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3426                 break;
3427         }
3428         case IMP_EVENT_INACTIVE: {
3429                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3430                 break;
3431         }
3432         case IMP_EVENT_INVALIDATE: {
3433                 struct ldlm_namespace *ns = obd->obd_namespace;
3434                 struct lu_env         *env;
3435                 int                    refcheck;
3436
3437                 env = cl_env_get(&refcheck);
3438                 if (!IS_ERR(env)) {
3439                         /* Reset grants */
3440                         cli = &obd->u.cli;
3441                         /* all pages go to failing rpcs due to the invalid
3442                          * import */
3443                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3444
3445                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3446                         cl_env_put(env, &refcheck);
3447                 } else
3448                         rc = PTR_ERR(env);
3449                 break;
3450         }
3451         case IMP_EVENT_ACTIVE: {
3452                 /* Only do this on the MDS OSC's */
3453                 if (imp->imp_server_timeout) {
3454                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3455
3456                         cfs_spin_lock(&oscc->oscc_lock);
3457                         oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
3458                                               OSCC_FLAG_NOSPC_BLK);
3459                         cfs_spin_unlock(&oscc->oscc_lock);
3460                 }
3461                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3462                 break;
3463         }
3464         case IMP_EVENT_OCD: {
3465                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3466
3467                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3468                         osc_init_grant(&obd->u.cli, ocd);
3469
3470                 /* See bug 7198 */
3471                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3472                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3473
3474                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3475                 break;
3476         }
3477         case IMP_EVENT_DEACTIVATE: {
3478                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3479                 break;
3480         }
3481         case IMP_EVENT_ACTIVATE: {
3482                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3483                 break;
3484         }
3485         default:
3486                 CERROR("Unknown import event %d\n", event);
3487                 LBUG();
3488         }
3489         RETURN(rc);
3490 }
3491
3492 /**
3493  * Determine whether the lock can be canceled before replaying the lock
3494  * during recovery, see bug16774 for detailed information.
3495  *
3496  * \retval zero the lock can't be canceled
3497  * \retval other ok to cancel
3498  */
3499 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3500 {
3501         check_res_locked(lock->l_resource);
3502
3503         /*
3504          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3505          *
3506          * XXX as a future improvement, we can also cancel unused write lock
3507          * if it doesn't have dirty data and active mmaps.
3508          */
3509         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3510             (lock->l_granted_mode == LCK_PR ||
3511              lock->l_granted_mode == LCK_CR) &&
3512             (osc_dlm_lock_pageref(lock) == 0))
3513                 RETURN(1);
3514
3515         RETURN(0);
3516 }
3517
3518 static int brw_queue_work(const struct lu_env *env, void *data)
3519 {
3520         struct client_obd *cli = data;
3521
3522         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3523
3524         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3525         RETURN(0);
3526 }
3527
3528 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3529 {
3530         struct client_obd *cli = &obd->u.cli;
3531         int rc;
3532         ENTRY;
3533
3534         ENTRY;
3535         rc = ptlrpcd_addref();
3536         if (rc)
3537                 RETURN(rc);
3538
3539         rc = client_obd_setup(obd, lcfg);
3540         if (rc == 0) {
3541                 void *handler;
3542                 handler = ptlrpcd_alloc_work(cli->cl_import,
3543                                              brw_queue_work, cli);
3544                 if (!IS_ERR(handler))
3545                         cli->cl_writeback_work = handler;
3546                 else
3547                         rc = PTR_ERR(handler);
3548         }
3549
3550         if (rc == 0) {
3551                 struct lprocfs_static_vars lvars = { 0 };
3552
3553                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3554                 lprocfs_osc_init_vars(&lvars);
3555                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3556                         lproc_osc_attach_seqstat(obd);
3557                         sptlrpc_lprocfs_cliobd_attach(obd);
3558                         ptlrpc_lprocfs_register_obd(obd);
3559                 }
3560
3561                 oscc_init(obd);
3562                 /* We need to allocate a few requests more, because
3563                    brw_interpret tries to create new requests before freeing
3564                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3565                    reserved, but I afraid that might be too much wasted RAM
3566                    in fact, so 2 is just my guess and still should work. */
3567                 cli->cl_import->imp_rq_pool =
3568                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3569                                             OST_MAXREQSIZE,
3570                                             ptlrpc_add_rqs_to_pool);
3571
3572                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3573
3574                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3575         }
3576
3577         if (rc)
3578                 ptlrpcd_decref();
3579         RETURN(rc);
3580 }
3581
3582 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3583 {
3584         int rc = 0;
3585         ENTRY;
3586
3587         switch (stage) {
3588         case OBD_CLEANUP_EARLY: {
3589                 struct obd_import *imp;
3590                 imp = obd->u.cli.cl_import;
3591                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3592                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3593                 ptlrpc_deactivate_import(imp);
3594                 cfs_spin_lock(&imp->imp_lock);
3595                 imp->imp_pingable = 0;
3596                 cfs_spin_unlock(&imp->imp_lock);
3597                 break;
3598         }
3599         case OBD_CLEANUP_EXPORTS: {
3600                 struct client_obd *cli = &obd->u.cli;
3601                 /* LU-464
3602                  * for echo client, export may be on zombie list, wait for
3603                  * zombie thread to cull it, because cli.cl_import will be
3604                  * cleared in client_disconnect_export():
3605                  *   class_export_destroy() -> obd_cleanup() ->
3606                  *   echo_device_free() -> echo_client_cleanup() ->
3607                  *   obd_disconnect() -> osc_disconnect() ->
3608                  *   client_disconnect_export()
3609                  */
3610                 obd_zombie_barrier();
3611                 if (cli->cl_writeback_work) {
3612                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3613                         cli->cl_writeback_work = NULL;
3614                 }
3615                 obd_cleanup_client_import(obd);
3616                 ptlrpc_lprocfs_unregister_obd(obd);
3617                 lprocfs_obd_cleanup(obd);
3618                 rc = obd_llog_finish(obd, 0);
3619                 if (rc != 0)
3620                         CERROR("failed to cleanup llogging subsystems\n");
3621                 break;
3622                 }
3623         }
3624         RETURN(rc);
3625 }
3626
3627 int osc_cleanup(struct obd_device *obd)
3628 {
3629         int rc;
3630
3631         ENTRY;
3632
3633         /* free memory of osc quota cache */
3634         osc_quota_cleanup(obd);
3635
3636         rc = client_obd_cleanup(obd);
3637
3638         ptlrpcd_decref();
3639         RETURN(rc);
3640 }
3641
3642 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3643 {
3644         struct lprocfs_static_vars lvars = { 0 };
3645         int rc = 0;
3646
3647         lprocfs_osc_init_vars(&lvars);
3648
3649         switch (lcfg->lcfg_command) {
3650         default:
3651                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3652                                               lcfg, obd);
3653                 if (rc > 0)
3654                         rc = 0;
3655                 break;
3656         }
3657
3658         return(rc);
3659 }
3660
3661 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3662 {
3663         return osc_process_config_base(obd, buf);
3664 }
3665
3666 struct obd_ops osc_obd_ops = {
3667         .o_owner                = THIS_MODULE,
3668         .o_setup                = osc_setup,
3669         .o_precleanup           = osc_precleanup,
3670         .o_cleanup              = osc_cleanup,
3671         .o_add_conn             = client_import_add_conn,
3672         .o_del_conn             = client_import_del_conn,
3673         .o_connect              = client_connect_import,
3674         .o_reconnect            = osc_reconnect,
3675         .o_disconnect           = osc_disconnect,
3676         .o_statfs               = osc_statfs,
3677         .o_statfs_async         = osc_statfs_async,
3678         .o_packmd               = osc_packmd,
3679         .o_unpackmd             = osc_unpackmd,
3680         .o_precreate            = osc_precreate,
3681         .o_create               = osc_create,
3682         .o_create_async         = osc_create_async,
3683         .o_destroy              = osc_destroy,
3684         .o_getattr              = osc_getattr,
3685         .o_getattr_async        = osc_getattr_async,
3686         .o_setattr              = osc_setattr,
3687         .o_setattr_async        = osc_setattr_async,
3688         .o_brw                  = osc_brw,
3689         .o_punch                = osc_punch,
3690         .o_sync                 = osc_sync,
3691         .o_enqueue              = osc_enqueue,
3692         .o_change_cbdata        = osc_change_cbdata,
3693         .o_find_cbdata          = osc_find_cbdata,
3694         .o_cancel               = osc_cancel,
3695         .o_cancel_unused        = osc_cancel_unused,
3696         .o_iocontrol            = osc_iocontrol,
3697         .o_get_info             = osc_get_info,
3698         .o_set_info_async       = osc_set_info_async,
3699         .o_import_event         = osc_import_event,
3700         .o_llog_init            = osc_llog_init,
3701         .o_llog_finish          = osc_llog_finish,
3702         .o_process_config       = osc_process_config,
3703         .o_quotactl             = osc_quotactl,
3704         .o_quotacheck           = osc_quotacheck,
3705         .o_quota_adjust_qunit   = osc_quota_adjust_qunit,
3706 };
3707
3708 extern struct lu_kmem_descr osc_caches[];
3709 extern cfs_spinlock_t       osc_ast_guard;
3710 extern cfs_lock_class_key_t osc_ast_guard_class;
3711
3712 int __init osc_init(void)
3713 {
3714         struct lprocfs_static_vars lvars = { 0 };
3715         int rc;
3716         ENTRY;
3717
3718         /* print an address of _any_ initialized kernel symbol from this
3719          * module, to allow debugging with gdb that doesn't support data
3720          * symbols from modules.*/
3721         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3722
3723         rc = lu_kmem_init(osc_caches);
3724
3725         lprocfs_osc_init_vars(&lvars);
3726
3727         osc_quota_init();
3728         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3729                                  LUSTRE_OSC_NAME, &osc_device_type);
3730         if (rc) {
3731                 lu_kmem_fini(osc_caches);
3732                 RETURN(rc);
3733         }
3734
3735         cfs_spin_lock_init(&osc_ast_guard);
3736         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3737
3738         osc_mds_ost_orig_logops = llog_lvfs_ops;
3739         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3740         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3741         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3742         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3743
3744         RETURN(rc);
3745 }
3746
3747 #ifdef __KERNEL__
3748 static void /*__exit*/ osc_exit(void)
3749 {
3750         osc_quota_exit();
3751         class_unregister_type(LUSTRE_OSC_NAME);
3752         lu_kmem_fini(osc_caches);
3753 }
3754
3755 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3756 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3757 MODULE_LICENSE("GPL");
3758
3759 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3760 #endif