Whamcloud - gitweb
LU-1396 osc: control the RPC rate between MDS and OST
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
153         }
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_setattr_args *sa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                      rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                    rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&body->oa, oa);
459
460         ptlrpc_request_set_replen(req);
461
462         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463             oa->o_flags == OBD_FL_DELORPHAN) {
464                 DEBUG_REQ(D_HA, req,
465                           "delorphan from OST integration");
466                 /* Don't resend the delorphan req */
467                 req->rq_no_resend = req->rq_no_delay = 1;
468         }
469
470         rc = ptlrpc_queue_wait(req);
471         if (rc)
472                 GOTO(out_req, rc);
473
474         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475         if (body == NULL)
476                 GOTO(out_req, rc = -EPROTO);
477
478         lustre_get_wire_obdo(oa, &body->oa);
479
480         /* This should really be sent by the OST */
481         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482         oa->o_valid |= OBD_MD_FLBLKSZ;
483
484         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485          * have valid lsm_oinfo data structs, so don't go touching that.
486          * This needs to be fixed in a big way.
487          */
488         lsm->lsm_object_id = oa->o_id;
489         lsm->lsm_object_seq = oa->o_seq;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body         *body;
519         int                      rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538         osc_pack_capa(req, body, oinfo->oi_capa);
539
540         ptlrpc_request_set_replen(req);
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557                      struct obd_info *oinfo, struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_fsync_args *fa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *fa->fa_oi->oi_oa = body->oa;
585 out:
586         rc = fa->fa_upcall(fa->fa_cookie, rc);
587         RETURN(rc);
588 }
589
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591                   obd_enqueue_update_f upcall, void *cookie,
592                   struct ptlrpc_request_set *rqset)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_fsync_args *fa;
597         int                    rc;
598         ENTRY;
599
600         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601         if (req == NULL)
602                 RETURN(-ENOMEM);
603
604         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         /* overload the size and blocks fields in the oa with start/end */
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615         osc_pack_capa(req, body, oinfo->oi_capa);
616
617         ptlrpc_request_set_replen(req);
618         req->rq_interpret_reply = osc_sync_interpret;
619
620         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621         fa = ptlrpc_req_async_args(req);
622         fa->fa_oi = oinfo;
623         fa->fa_upcall = upcall;
624         fa->fa_cookie = cookie;
625
626         if (rqset == PTLRPCD_SET)
627                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
628         else
629                 ptlrpc_set_add_req(rqset, req);
630
631         RETURN (0);
632 }
633
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635                     struct obd_info *oinfo, obd_size start, obd_size end,
636                     struct ptlrpc_request_set *set)
637 {
638         ENTRY;
639
640         if (!oinfo->oi_oa) {
641                 CDEBUG(D_INFO, "oa NULL\n");
642                 RETURN(-EINVAL);
643         }
644
645         oinfo->oi_oa->o_size = start;
646         oinfo->oi_oa->o_blocks = end;
647         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
648
649         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
650 }
651
652 /* Find and cancel locally locks matched by @mode in the resource found by
653  * @objid. Found locks are added into @cancel list. Returns the amount of
654  * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
656                                    cfs_list_t *cancels,
657                                    ldlm_mode_t mode, int lock_flags)
658 {
659         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660         struct ldlm_res_id res_id;
661         struct ldlm_resource *res;
662         int count;
663         ENTRY;
664
665         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
666         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
667         if (res == NULL)
668                 RETURN(0);
669
670         LDLM_RESOURCE_ADDREF(res);
671         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
672                                            lock_flags, 0, NULL);
673         LDLM_RESOURCE_DELREF(res);
674         ldlm_resource_putref(res);
675         RETURN(count);
676 }
677
678 static int osc_destroy_interpret(const struct lu_env *env,
679                                  struct ptlrpc_request *req, void *data,
680                                  int rc)
681 {
682         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
683
684         cfs_atomic_dec(&cli->cl_destroy_in_flight);
685         cfs_waitq_signal(&cli->cl_destroy_waitq);
686         return 0;
687 }
688
689 static int osc_can_send_destroy(struct client_obd *cli)
690 {
691         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
692             cli->cl_max_rpcs_in_flight) {
693                 /* The destroy request can be sent */
694                 return 1;
695         }
696         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
697             cli->cl_max_rpcs_in_flight) {
698                 /*
699                  * The counter has been modified between the two atomic
700                  * operations.
701                  */
702                 cfs_waitq_signal(&cli->cl_destroy_waitq);
703         }
704         return 0;
705 }
706
707 /* Destroy requests can be async always on the client, and we don't even really
708  * care about the return code since the client cannot do anything at all about
709  * a destroy failure.
710  * When the MDS is unlinking a filename, it saves the file objects into a
711  * recovery llog, and these object records are cancelled when the OST reports
712  * they were destroyed and sync'd to disk (i.e. transaction committed).
713  * If the client dies, or the OST is down when the object should be destroyed,
714  * the records are not cancelled, and when the OST reconnects to the MDS next,
715  * it will retrieve the llog unlink logs and then sends the log cancellation
716  * cookies to the MDS after committing destroy transactions. */
717 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
718                        struct obdo *oa, struct lov_stripe_md *ea,
719                        struct obd_trans_info *oti, struct obd_export *md_export,
720                        void *capa)
721 {
722         struct client_obd     *cli = &exp->exp_obd->u.cli;
723         struct ptlrpc_request *req;
724         struct ost_body       *body;
725         CFS_LIST_HEAD(cancels);
726         int rc, count;
727         ENTRY;
728
729         if (!oa) {
730                 CDEBUG(D_INFO, "oa NULL\n");
731                 RETURN(-EINVAL);
732         }
733
734         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
735                                         LDLM_FL_DISCARD_DATA);
736
737         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
738         if (req == NULL) {
739                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
740                 RETURN(-ENOMEM);
741         }
742
743         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
744         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
745                                0, &cancels, count);
746         if (rc) {
747                 ptlrpc_request_free(req);
748                 RETURN(rc);
749         }
750
751         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
752         ptlrpc_at_set_req_timeout(req);
753
754         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
755                 oa->o_lcookie = *oti->oti_logcookies;
756         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
757         LASSERT(body);
758         lustre_set_wire_obdo(&body->oa, oa);
759
760         osc_pack_capa(req, body, (struct obd_capa *)capa);
761         ptlrpc_request_set_replen(req);
762
763         /* If osc_destory is for destroying the unlink orphan,
764          * sent from MDT to OST, which should not be blocked here,
765          * because the process might be triggered by ptlrpcd, and
766          * it is not good to block ptlrpcd thread (b=16006)*/
767         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
768                 req->rq_interpret_reply = osc_destroy_interpret;
769                 if (!osc_can_send_destroy(cli)) {
770                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
771                                                           NULL);
772
773                         /*
774                          * Wait until the number of on-going destroy RPCs drops
775                          * under max_rpc_in_flight
776                          */
777                         l_wait_event_exclusive(cli->cl_destroy_waitq,
778                                                osc_can_send_destroy(cli), &lwi);
779                 }
780         }
781
782         /* Do not wait for response */
783         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
784         RETURN(0);
785 }
786
787 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
788                                 long writing_bytes)
789 {
790         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
791
792         LASSERT(!(oa->o_valid & bits));
793
794         oa->o_valid |= bits;
795         client_obd_list_lock(&cli->cl_loi_list_lock);
796         oa->o_dirty = cli->cl_dirty;
797         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
798                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
799                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
800                 oa->o_undirty = 0;
801         } else if (cfs_atomic_read(&obd_dirty_pages) -
802                    cfs_atomic_read(&obd_dirty_transit_pages) >
803                    obd_max_dirty_pages + 1){
804                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
805                  * not covered by a lock thus they may safely race and trip
806                  * this CERROR() unless we add in a small fudge factor (+1). */
807                 CERROR("dirty %d - %d > system dirty_max %d\n",
808                        cfs_atomic_read(&obd_dirty_pages),
809                        cfs_atomic_read(&obd_dirty_transit_pages),
810                        obd_max_dirty_pages);
811                 oa->o_undirty = 0;
812         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
813                 CERROR("dirty %lu - dirty_max %lu too big???\n",
814                        cli->cl_dirty, cli->cl_dirty_max);
815                 oa->o_undirty = 0;
816         } else {
817                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
818                                 (cli->cl_max_rpcs_in_flight + 1);
819                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
820         }
821         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
822         oa->o_dropped = cli->cl_lost_grant;
823         cli->cl_lost_grant = 0;
824         client_obd_list_unlock(&cli->cl_loi_list_lock);
825         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
826                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
827
828 }
829
830 void osc_update_next_shrink(struct client_obd *cli)
831 {
832         cli->cl_next_shrink_grant =
833                 cfs_time_shift(cli->cl_grant_shrink_interval);
834         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
835                cli->cl_next_shrink_grant);
836 }
837
838 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
839 {
840         client_obd_list_lock(&cli->cl_loi_list_lock);
841         cli->cl_avail_grant += grant;
842         client_obd_list_unlock(&cli->cl_loi_list_lock);
843 }
844
845 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
846 {
847         if (body->oa.o_valid & OBD_MD_FLGRANT) {
848                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
849                 __osc_update_grant(cli, body->oa.o_grant);
850         }
851 }
852
853 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
854                               obd_count keylen, void *key, obd_count vallen,
855                               void *val, struct ptlrpc_request_set *set);
856
857 static int osc_shrink_grant_interpret(const struct lu_env *env,
858                                       struct ptlrpc_request *req,
859                                       void *aa, int rc)
860 {
861         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
862         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
863         struct ost_body *body;
864
865         if (rc != 0) {
866                 __osc_update_grant(cli, oa->o_grant);
867                 GOTO(out, rc);
868         }
869
870         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
871         LASSERT(body);
872         osc_update_grant(cli, body);
873 out:
874         OBDO_FREE(oa);
875         return rc;
876 }
877
878 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
879 {
880         client_obd_list_lock(&cli->cl_loi_list_lock);
881         oa->o_grant = cli->cl_avail_grant / 4;
882         cli->cl_avail_grant -= oa->o_grant;
883         client_obd_list_unlock(&cli->cl_loi_list_lock);
884         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
885                 oa->o_valid |= OBD_MD_FLFLAGS;
886                 oa->o_flags = 0;
887         }
888         oa->o_flags |= OBD_FL_SHRINK_GRANT;
889         osc_update_next_shrink(cli);
890 }
891
892 /* Shrink the current grant, either from some large amount to enough for a
893  * full set of in-flight RPCs, or if we have already shrunk to that limit
894  * then to enough for a single RPC.  This avoids keeping more grant than
895  * needed, and avoids shrinking the grant piecemeal. */
896 static int osc_shrink_grant(struct client_obd *cli)
897 {
898         long target = (cli->cl_max_rpcs_in_flight + 1) *
899                       cli->cl_max_pages_per_rpc;
900
901         client_obd_list_lock(&cli->cl_loi_list_lock);
902         if (cli->cl_avail_grant <= target)
903                 target = cli->cl_max_pages_per_rpc;
904         client_obd_list_unlock(&cli->cl_loi_list_lock);
905
906         return osc_shrink_grant_to_target(cli, target);
907 }
908
909 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
910 {
911         int    rc = 0;
912         struct ost_body     *body;
913         ENTRY;
914
915         client_obd_list_lock(&cli->cl_loi_list_lock);
916         /* Don't shrink if we are already above or below the desired limit
917          * We don't want to shrink below a single RPC, as that will negatively
918          * impact block allocation and long-term performance. */
919         if (target < cli->cl_max_pages_per_rpc)
920                 target = cli->cl_max_pages_per_rpc;
921
922         if (target >= cli->cl_avail_grant) {
923                 client_obd_list_unlock(&cli->cl_loi_list_lock);
924                 RETURN(0);
925         }
926         client_obd_list_unlock(&cli->cl_loi_list_lock);
927
928         OBD_ALLOC_PTR(body);
929         if (!body)
930                 RETURN(-ENOMEM);
931
932         osc_announce_cached(cli, &body->oa, 0);
933
934         client_obd_list_lock(&cli->cl_loi_list_lock);
935         body->oa.o_grant = cli->cl_avail_grant - target;
936         cli->cl_avail_grant = target;
937         client_obd_list_unlock(&cli->cl_loi_list_lock);
938         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
939                 body->oa.o_valid |= OBD_MD_FLFLAGS;
940                 body->oa.o_flags = 0;
941         }
942         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
943         osc_update_next_shrink(cli);
944
945         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
946                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
947                                 sizeof(*body), body, NULL);
948         if (rc != 0)
949                 __osc_update_grant(cli, body->oa.o_grant);
950         OBD_FREE_PTR(body);
951         RETURN(rc);
952 }
953
954 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
955 static int osc_should_shrink_grant(struct client_obd *client)
956 {
957         cfs_time_t time = cfs_time_current();
958         cfs_time_t next_shrink = client->cl_next_shrink_grant;
959
960         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
961              OBD_CONNECT_GRANT_SHRINK) == 0)
962                 return 0;
963
964         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
965                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
966                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
967                         return 1;
968                 else
969                         osc_update_next_shrink(client);
970         }
971         return 0;
972 }
973
974 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
975 {
976         struct client_obd *client;
977
978         cfs_list_for_each_entry(client, &item->ti_obd_list,
979                                 cl_grant_shrink_list) {
980                 if (osc_should_shrink_grant(client))
981                         osc_shrink_grant(client);
982         }
983         return 0;
984 }
985
986 static int osc_add_shrink_grant(struct client_obd *client)
987 {
988         int rc;
989
990         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
991                                        TIMEOUT_GRANT,
992                                        osc_grant_shrink_grant_cb, NULL,
993                                        &client->cl_grant_shrink_list);
994         if (rc) {
995                 CERROR("add grant client %s error %d\n",
996                         client->cl_import->imp_obd->obd_name, rc);
997                 return rc;
998         }
999         CDEBUG(D_CACHE, "add grant client %s \n",
1000                client->cl_import->imp_obd->obd_name);
1001         osc_update_next_shrink(client);
1002         return 0;
1003 }
1004
1005 static int osc_del_shrink_grant(struct client_obd *client)
1006 {
1007         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1008                                          TIMEOUT_GRANT);
1009 }
1010
1011 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1012 {
1013         /*
1014          * ocd_grant is the total grant amount we're expect to hold: if we've
1015          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1016          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1017          *
1018          * race is tolerable here: if we're evicted, but imp_state already
1019          * left EVICTED state, then cl_dirty must be 0 already.
1020          */
1021         client_obd_list_lock(&cli->cl_loi_list_lock);
1022         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1023                 cli->cl_avail_grant = ocd->ocd_grant;
1024         else
1025                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1026
1027         if (cli->cl_avail_grant < 0) {
1028                 CWARN("%s: available grant < 0, the OSS is probably not running"
1029                       " with patch from bug20278 (%ld) \n",
1030                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1031                 /* workaround for 1.6 servers which do not have
1032                  * the patch from bug20278 */
1033                 cli->cl_avail_grant = ocd->ocd_grant;
1034         }
1035
1036         /* determine the appropriate chunk size used by osc_extent. */
1037         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1038         client_obd_list_unlock(&cli->cl_loi_list_lock);
1039
1040         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1041                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1042                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1043
1044         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1045             cfs_list_empty(&cli->cl_grant_shrink_list))
1046                 osc_add_shrink_grant(cli);
1047 }
1048
1049 /* We assume that the reason this OSC got a short read is because it read
1050  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1051  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1052  * this stripe never got written at or beyond this stripe offset yet. */
1053 static void handle_short_read(int nob_read, obd_count page_count,
1054                               struct brw_page **pga)
1055 {
1056         char *ptr;
1057         int i = 0;
1058
1059         /* skip bytes read OK */
1060         while (nob_read > 0) {
1061                 LASSERT (page_count > 0);
1062
1063                 if (pga[i]->count > nob_read) {
1064                         /* EOF inside this page */
1065                         ptr = cfs_kmap(pga[i]->pg) +
1066                                 (pga[i]->off & ~CFS_PAGE_MASK);
1067                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1068                         cfs_kunmap(pga[i]->pg);
1069                         page_count--;
1070                         i++;
1071                         break;
1072                 }
1073
1074                 nob_read -= pga[i]->count;
1075                 page_count--;
1076                 i++;
1077         }
1078
1079         /* zero remaining pages */
1080         while (page_count-- > 0) {
1081                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1082                 memset(ptr, 0, pga[i]->count);
1083                 cfs_kunmap(pga[i]->pg);
1084                 i++;
1085         }
1086 }
1087
1088 static int check_write_rcs(struct ptlrpc_request *req,
1089                            int requested_nob, int niocount,
1090                            obd_count page_count, struct brw_page **pga)
1091 {
1092         int     i;
1093         __u32   *remote_rcs;
1094
1095         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1096                                                   sizeof(*remote_rcs) *
1097                                                   niocount);
1098         if (remote_rcs == NULL) {
1099                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1100                 return(-EPROTO);
1101         }
1102
1103         /* return error if any niobuf was in error */
1104         for (i = 0; i < niocount; i++) {
1105                 if ((int)remote_rcs[i] < 0)
1106                         return(remote_rcs[i]);
1107
1108                 if (remote_rcs[i] != 0) {
1109                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1110                                 i, remote_rcs[i], req);
1111                         return(-EPROTO);
1112                 }
1113         }
1114
1115         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1116                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1117                        req->rq_bulk->bd_nob_transferred, requested_nob);
1118                 return(-EPROTO);
1119         }
1120
1121         return (0);
1122 }
1123
1124 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1125 {
1126         if (p1->flag != p2->flag) {
1127                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1128                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1129
1130                 /* warn if we try to combine flags that we don't know to be
1131                  * safe to combine */
1132                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1133                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1134                               "report this at http://bugs.whamcloud.com/\n",
1135                               p1->flag, p2->flag);
1136                 }
1137                 return 0;
1138         }
1139
1140         return (p1->off + p1->count == p2->off);
1141 }
1142
1143 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1144                                    struct brw_page **pga, int opc,
1145                                    cksum_type_t cksum_type)
1146 {
1147         __u32 cksum;
1148         int i = 0;
1149
1150         LASSERT (pg_count > 0);
1151         cksum = init_checksum(cksum_type);
1152         while (nob > 0 && pg_count > 0) {
1153                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1154                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1155                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1156
1157                 /* corrupt the data before we compute the checksum, to
1158                  * simulate an OST->client data error */
1159                 if (i == 0 && opc == OST_READ &&
1160                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1161                         memcpy(ptr + off, "bad1", min(4, nob));
1162                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1163                 cfs_kunmap(pga[i]->pg);
1164                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1165                                off, cksum);
1166
1167                 nob -= pga[i]->count;
1168                 pg_count--;
1169                 i++;
1170         }
1171         /* For sending we only compute the wrong checksum instead
1172          * of corrupting the data so it is still correct on a redo */
1173         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1174                 cksum++;
1175
1176         return fini_checksum(cksum, cksum_type);
1177 }
1178
1179 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1180                                 struct lov_stripe_md *lsm, obd_count page_count,
1181                                 struct brw_page **pga,
1182                                 struct ptlrpc_request **reqp,
1183                                 struct obd_capa *ocapa, int reserve,
1184                                 int resend)
1185 {
1186         struct ptlrpc_request   *req;
1187         struct ptlrpc_bulk_desc *desc;
1188         struct ost_body         *body;
1189         struct obd_ioobj        *ioobj;
1190         struct niobuf_remote    *niobuf;
1191         int niocount, i, requested_nob, opc, rc;
1192         struct osc_brw_async_args *aa;
1193         struct req_capsule      *pill;
1194         struct brw_page *pg_prev;
1195
1196         ENTRY;
1197         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1198                 RETURN(-ENOMEM); /* Recoverable */
1199         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1200                 RETURN(-EINVAL); /* Fatal */
1201
1202         if ((cmd & OBD_BRW_WRITE) != 0) {
1203                 opc = OST_WRITE;
1204                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1205                                                 cli->cl_import->imp_rq_pool,
1206                                                 &RQF_OST_BRW_WRITE);
1207         } else {
1208                 opc = OST_READ;
1209                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1210         }
1211         if (req == NULL)
1212                 RETURN(-ENOMEM);
1213
1214         for (niocount = i = 1; i < page_count; i++) {
1215                 if (!can_merge_pages(pga[i - 1], pga[i]))
1216                         niocount++;
1217         }
1218
1219         pill = &req->rq_pill;
1220         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1221                              sizeof(*ioobj));
1222         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1223                              niocount * sizeof(*niobuf));
1224         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1225
1226         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1227         if (rc) {
1228                 ptlrpc_request_free(req);
1229                 RETURN(rc);
1230         }
1231         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1232         ptlrpc_at_set_req_timeout(req);
1233
1234         if (opc == OST_WRITE)
1235                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1236                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1237         else
1238                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1239                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1240
1241         if (desc == NULL)
1242                 GOTO(out, rc = -ENOMEM);
1243         /* NB request now owns desc and will free it when it gets freed */
1244
1245         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1246         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1247         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1248         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1249
1250         lustre_set_wire_obdo(&body->oa, oa);
1251
1252         obdo_to_ioobj(oa, ioobj);
1253         ioobj->ioo_bufcnt = niocount;
1254         osc_pack_capa(req, body, ocapa);
1255         LASSERT (page_count > 0);
1256         pg_prev = pga[0];
1257         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1258                 struct brw_page *pg = pga[i];
1259                 int poff = pg->off & ~CFS_PAGE_MASK;
1260
1261                 LASSERT(pg->count > 0);
1262                 /* make sure there is no gap in the middle of page array */
1263                 LASSERTF(page_count == 1 ||
1264                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1265                           ergo(i > 0 && i < page_count - 1,
1266                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1267                           ergo(i == page_count - 1, poff == 0)),
1268                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1269                          i, page_count, pg, pg->off, pg->count);
1270 #ifdef __linux__
1271                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1272                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1273                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1274                          i, page_count,
1275                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1276                          pg_prev->pg, page_private(pg_prev->pg),
1277                          pg_prev->pg->index, pg_prev->off);
1278 #else
1279                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1280                          "i %d p_c %u\n", i, page_count);
1281 #endif
1282                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1283                         (pg->flag & OBD_BRW_SRVLOCK));
1284
1285                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1286                 requested_nob += pg->count;
1287
1288                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1289                         niobuf--;
1290                         niobuf->len += pg->count;
1291                 } else {
1292                         niobuf->offset = pg->off;
1293                         niobuf->len    = pg->count;
1294                         niobuf->flags  = pg->flag;
1295                 }
1296                 pg_prev = pg;
1297         }
1298
1299         LASSERTF((void *)(niobuf - niocount) ==
1300                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1301                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1302                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1303
1304         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1305         if (resend) {
1306                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1307                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1308                         body->oa.o_flags = 0;
1309                 }
1310                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1311         }
1312
1313         if (osc_should_shrink_grant(cli))
1314                 osc_shrink_grant_local(cli, &body->oa);
1315
1316         /* size[REQ_REC_OFF] still sizeof (*body) */
1317         if (opc == OST_WRITE) {
1318                 if (cli->cl_checksum &&
1319                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1320                         /* store cl_cksum_type in a local variable since
1321                          * it can be changed via lprocfs */
1322                         cksum_type_t cksum_type = cli->cl_cksum_type;
1323
1324                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1325                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1326                                 body->oa.o_flags = 0;
1327                         }
1328                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1329                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1330                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1331                                                              page_count, pga,
1332                                                              OST_WRITE,
1333                                                              cksum_type);
1334                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1335                                body->oa.o_cksum);
1336                         /* save this in 'oa', too, for later checking */
1337                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1338                         oa->o_flags |= cksum_type_pack(cksum_type);
1339                 } else {
1340                         /* clear out the checksum flag, in case this is a
1341                          * resend but cl_checksum is no longer set. b=11238 */
1342                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1343                 }
1344                 oa->o_cksum = body->oa.o_cksum;
1345                 /* 1 RC per niobuf */
1346                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1347                                      sizeof(__u32) * niocount);
1348         } else {
1349                 if (cli->cl_checksum &&
1350                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1351                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1352                                 body->oa.o_flags = 0;
1353                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1354                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1355                 }
1356         }
1357         ptlrpc_request_set_replen(req);
1358
1359         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1360         aa = ptlrpc_req_async_args(req);
1361         aa->aa_oa = oa;
1362         aa->aa_requested_nob = requested_nob;
1363         aa->aa_nio_count = niocount;
1364         aa->aa_page_count = page_count;
1365         aa->aa_resends = 0;
1366         aa->aa_ppga = pga;
1367         aa->aa_cli = cli;
1368         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1369         if (ocapa && reserve)
1370                 aa->aa_ocapa = capa_get(ocapa);
1371
1372         *reqp = req;
1373         RETURN(0);
1374
1375  out:
1376         ptlrpc_req_finished(req);
1377         RETURN(rc);
1378 }
1379
1380 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1381                                 __u32 client_cksum, __u32 server_cksum, int nob,
1382                                 obd_count page_count, struct brw_page **pga,
1383                                 cksum_type_t client_cksum_type)
1384 {
1385         __u32 new_cksum;
1386         char *msg;
1387         cksum_type_t cksum_type;
1388
1389         if (server_cksum == client_cksum) {
1390                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1391                 return 0;
1392         }
1393
1394         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1395                                        oa->o_flags : 0);
1396         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1397                                       cksum_type);
1398
1399         if (cksum_type != client_cksum_type)
1400                 msg = "the server did not use the checksum type specified in "
1401                       "the original request - likely a protocol problem";
1402         else if (new_cksum == server_cksum)
1403                 msg = "changed on the client after we checksummed it - "
1404                       "likely false positive due to mmap IO (bug 11742)";
1405         else if (new_cksum == client_cksum)
1406                 msg = "changed in transit before arrival at OST";
1407         else
1408                 msg = "changed in transit AND doesn't match the original - "
1409                       "likely false positive due to mmap IO (bug 11742)";
1410
1411         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1412                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1413                            msg, libcfs_nid2str(peer->nid),
1414                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1415                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1416                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1417                            oa->o_id,
1418                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1419                            pga[0]->off,
1420                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1421         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1422                "client csum now %x\n", client_cksum, client_cksum_type,
1423                server_cksum, cksum_type, new_cksum);
1424         return 1;
1425 }
1426
1427 /* Note rc enters this function as number of bytes transferred */
1428 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1429 {
1430         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1431         const lnet_process_id_t *peer =
1432                         &req->rq_import->imp_connection->c_peer;
1433         struct client_obd *cli = aa->aa_cli;
1434         struct ost_body *body;
1435         __u32 client_cksum = 0;
1436         ENTRY;
1437
1438         if (rc < 0 && rc != -EDQUOT) {
1439                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1440                 RETURN(rc);
1441         }
1442
1443         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1444         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1445         if (body == NULL) {
1446                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1447                 RETURN(-EPROTO);
1448         }
1449
1450         /* set/clear over quota flag for a uid/gid */
1451         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1452             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1453                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1454
1455                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1456                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1457                        body->oa.o_flags);
1458                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1459         }
1460
1461         osc_update_grant(cli, body);
1462
1463         if (rc < 0)
1464                 RETURN(rc);
1465
1466         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1467                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1468
1469         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1470                 if (rc > 0) {
1471                         CERROR("Unexpected +ve rc %d\n", rc);
1472                         RETURN(-EPROTO);
1473                 }
1474                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1475
1476                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1477                         RETURN(-EAGAIN);
1478
1479                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1480                     check_write_checksum(&body->oa, peer, client_cksum,
1481                                          body->oa.o_cksum, aa->aa_requested_nob,
1482                                          aa->aa_page_count, aa->aa_ppga,
1483                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1484                         RETURN(-EAGAIN);
1485
1486                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1487                                      aa->aa_page_count, aa->aa_ppga);
1488                 GOTO(out, rc);
1489         }
1490
1491         /* The rest of this function executes only for OST_READs */
1492
1493         /* if unwrap_bulk failed, return -EAGAIN to retry */
1494         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1495         if (rc < 0)
1496                 GOTO(out, rc = -EAGAIN);
1497
1498         if (rc > aa->aa_requested_nob) {
1499                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1500                        aa->aa_requested_nob);
1501                 RETURN(-EPROTO);
1502         }
1503
1504         if (rc != req->rq_bulk->bd_nob_transferred) {
1505                 CERROR ("Unexpected rc %d (%d transferred)\n",
1506                         rc, req->rq_bulk->bd_nob_transferred);
1507                 return (-EPROTO);
1508         }
1509
1510         if (rc < aa->aa_requested_nob)
1511                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1512
1513         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1514                 static int cksum_counter;
1515                 __u32      server_cksum = body->oa.o_cksum;
1516                 char      *via;
1517                 char      *router;
1518                 cksum_type_t cksum_type;
1519
1520                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1521                                                body->oa.o_flags : 0);
1522                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1523                                                  aa->aa_ppga, OST_READ,
1524                                                  cksum_type);
1525
1526                 if (peer->nid == req->rq_bulk->bd_sender) {
1527                         via = router = "";
1528                 } else {
1529                         via = " via ";
1530                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1531                 }
1532
1533                 if (server_cksum == ~0 && rc > 0) {
1534                         CERROR("Protocol error: server %s set the 'checksum' "
1535                                "bit, but didn't send a checksum.  Not fatal, "
1536                                "but please notify on http://bugs.whamcloud.com/\n",
1537                                libcfs_nid2str(peer->nid));
1538                 } else if (server_cksum != client_cksum) {
1539                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1540                                            "%s%s%s inode "DFID" object "
1541                                            LPU64"/"LPU64" extent "
1542                                            "["LPU64"-"LPU64"]\n",
1543                                            req->rq_import->imp_obd->obd_name,
1544                                            libcfs_nid2str(peer->nid),
1545                                            via, router,
1546                                            body->oa.o_valid & OBD_MD_FLFID ?
1547                                                 body->oa.o_parent_seq : (__u64)0,
1548                                            body->oa.o_valid & OBD_MD_FLFID ?
1549                                                 body->oa.o_parent_oid : 0,
1550                                            body->oa.o_valid & OBD_MD_FLFID ?
1551                                                 body->oa.o_parent_ver : 0,
1552                                            body->oa.o_id,
1553                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1554                                                 body->oa.o_seq : (__u64)0,
1555                                            aa->aa_ppga[0]->off,
1556                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1557                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1558                                                                         1);
1559                         CERROR("client %x, server %x, cksum_type %x\n",
1560                                client_cksum, server_cksum, cksum_type);
1561                         cksum_counter = 0;
1562                         aa->aa_oa->o_cksum = client_cksum;
1563                         rc = -EAGAIN;
1564                 } else {
1565                         cksum_counter++;
1566                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1567                         rc = 0;
1568                 }
1569         } else if (unlikely(client_cksum)) {
1570                 static int cksum_missed;
1571
1572                 cksum_missed++;
1573                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1574                         CERROR("Checksum %u requested from %s but not sent\n",
1575                                cksum_missed, libcfs_nid2str(peer->nid));
1576         } else {
1577                 rc = 0;
1578         }
1579 out:
1580         if (rc >= 0)
1581                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1582
1583         RETURN(rc);
1584 }
1585
1586 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1587                             struct lov_stripe_md *lsm,
1588                             obd_count page_count, struct brw_page **pga,
1589                             struct obd_capa *ocapa)
1590 {
1591         struct ptlrpc_request *req;
1592         int                    rc;
1593         cfs_waitq_t            waitq;
1594         int                    generation, resends = 0;
1595         struct l_wait_info     lwi;
1596
1597         ENTRY;
1598
1599         cfs_waitq_init(&waitq);
1600         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1601
1602 restart_bulk:
1603         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1604                                   page_count, pga, &req, ocapa, 0, resends);
1605         if (rc != 0)
1606                 return (rc);
1607
1608         if (resends) {
1609                 req->rq_generation_set = 1;
1610                 req->rq_import_generation = generation;
1611                 req->rq_sent = cfs_time_current_sec() + resends;
1612         }
1613
1614         rc = ptlrpc_queue_wait(req);
1615
1616         if (rc == -ETIMEDOUT && req->rq_resend) {
1617                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1618                 ptlrpc_req_finished(req);
1619                 goto restart_bulk;
1620         }
1621
1622         rc = osc_brw_fini_request(req, rc);
1623
1624         ptlrpc_req_finished(req);
1625         /* When server return -EINPROGRESS, client should always retry
1626          * regardless of the number of times the bulk was resent already.*/
1627         if (osc_recoverable_error(rc)) {
1628                 resends++;
1629                 if (rc != -EINPROGRESS &&
1630                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1631                         CERROR("%s: too many resend retries for object: "
1632                                ""LPU64":"LPU64", rc = %d.\n",
1633                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1634                         goto out;
1635                 }
1636                 if (generation !=
1637                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1638                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1639                                ""LPU64":"LPU64", rc = %d.\n",
1640                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1641                         goto out;
1642                 }
1643
1644                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1645                                        NULL);
1646                 l_wait_event(waitq, 0, &lwi);
1647
1648                 goto restart_bulk;
1649         }
1650 out:
1651         if (rc == -EAGAIN || rc == -EINPROGRESS)
1652                 rc = -EIO;
1653         RETURN (rc);
1654 }
1655
1656 int osc_brw_redo_request(struct ptlrpc_request *request,
1657                          struct osc_brw_async_args *aa)
1658 {
1659         struct ptlrpc_request *new_req;
1660         struct osc_brw_async_args *new_aa;
1661         struct osc_async_page *oap;
1662         int rc = 0;
1663         ENTRY;
1664
1665         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1666
1667         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1668                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1669                                   aa->aa_cli, aa->aa_oa,
1670                                   NULL /* lsm unused by osc currently */,
1671                                   aa->aa_page_count, aa->aa_ppga,
1672                                   &new_req, aa->aa_ocapa, 0, 1);
1673         if (rc)
1674                 RETURN(rc);
1675
1676         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1677                 if (oap->oap_request != NULL) {
1678                         LASSERTF(request == oap->oap_request,
1679                                  "request %p != oap_request %p\n",
1680                                  request, oap->oap_request);
1681                         if (oap->oap_interrupted) {
1682                                 ptlrpc_req_finished(new_req);
1683                                 RETURN(-EINTR);
1684                         }
1685                 }
1686         }
1687         /* New request takes over pga and oaps from old request.
1688          * Note that copying a list_head doesn't work, need to move it... */
1689         aa->aa_resends++;
1690         new_req->rq_interpret_reply = request->rq_interpret_reply;
1691         new_req->rq_async_args = request->rq_async_args;
1692         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1693         new_req->rq_generation_set = 1;
1694         new_req->rq_import_generation = request->rq_import_generation;
1695
1696         new_aa = ptlrpc_req_async_args(new_req);
1697
1698         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1699         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1700         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1701         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1702
1703         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1704                 if (oap->oap_request) {
1705                         ptlrpc_req_finished(oap->oap_request);
1706                         oap->oap_request = ptlrpc_request_addref(new_req);
1707                 }
1708         }
1709
1710         new_aa->aa_ocapa = aa->aa_ocapa;
1711         aa->aa_ocapa = NULL;
1712
1713         /* XXX: This code will run into problem if we're going to support
1714          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1715          * and wait for all of them to be finished. We should inherit request
1716          * set from old request. */
1717         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1718
1719         DEBUG_REQ(D_INFO, new_req, "new request");
1720         RETURN(0);
1721 }
1722
1723 /*
1724  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1725  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1726  * fine for our small page arrays and doesn't require allocation.  its an
1727  * insertion sort that swaps elements that are strides apart, shrinking the
1728  * stride down until its '1' and the array is sorted.
1729  */
1730 static void sort_brw_pages(struct brw_page **array, int num)
1731 {
1732         int stride, i, j;
1733         struct brw_page *tmp;
1734
1735         if (num == 1)
1736                 return;
1737         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1738                 ;
1739
1740         do {
1741                 stride /= 3;
1742                 for (i = stride ; i < num ; i++) {
1743                         tmp = array[i];
1744                         j = i;
1745                         while (j >= stride && array[j - stride]->off > tmp->off) {
1746                                 array[j] = array[j - stride];
1747                                 j -= stride;
1748                         }
1749                         array[j] = tmp;
1750                 }
1751         } while (stride > 1);
1752 }
1753
1754 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1755 {
1756         int count = 1;
1757         int offset;
1758         int i = 0;
1759
1760         LASSERT (pages > 0);
1761         offset = pg[i]->off & ~CFS_PAGE_MASK;
1762
1763         for (;;) {
1764                 pages--;
1765                 if (pages == 0)         /* that's all */
1766                         return count;
1767
1768                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1769                         return count;   /* doesn't end on page boundary */
1770
1771                 i++;
1772                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1773                 if (offset != 0)        /* doesn't start on page boundary */
1774                         return count;
1775
1776                 count++;
1777         }
1778 }
1779
1780 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1781 {
1782         struct brw_page **ppga;
1783         int i;
1784
1785         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1786         if (ppga == NULL)
1787                 return NULL;
1788
1789         for (i = 0; i < count; i++)
1790                 ppga[i] = pga + i;
1791         return ppga;
1792 }
1793
1794 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1795 {
1796         LASSERT(ppga != NULL);
1797         OBD_FREE(ppga, sizeof(*ppga) * count);
1798 }
1799
1800 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1801                    obd_count page_count, struct brw_page *pga,
1802                    struct obd_trans_info *oti)
1803 {
1804         struct obdo *saved_oa = NULL;
1805         struct brw_page **ppga, **orig;
1806         struct obd_import *imp = class_exp2cliimp(exp);
1807         struct client_obd *cli;
1808         int rc, page_count_orig;
1809         ENTRY;
1810
1811         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1812         cli = &imp->imp_obd->u.cli;
1813
1814         if (cmd & OBD_BRW_CHECK) {
1815                 /* The caller just wants to know if there's a chance that this
1816                  * I/O can succeed */
1817
1818                 if (imp->imp_invalid)
1819                         RETURN(-EIO);
1820                 RETURN(0);
1821         }
1822
1823         /* test_brw with a failed create can trip this, maybe others. */
1824         LASSERT(cli->cl_max_pages_per_rpc);
1825
1826         rc = 0;
1827
1828         orig = ppga = osc_build_ppga(pga, page_count);
1829         if (ppga == NULL)
1830                 RETURN(-ENOMEM);
1831         page_count_orig = page_count;
1832
1833         sort_brw_pages(ppga, page_count);
1834         while (page_count) {
1835                 obd_count pages_per_brw;
1836
1837                 if (page_count > cli->cl_max_pages_per_rpc)
1838                         pages_per_brw = cli->cl_max_pages_per_rpc;
1839                 else
1840                         pages_per_brw = page_count;
1841
1842                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1843
1844                 if (saved_oa != NULL) {
1845                         /* restore previously saved oa */
1846                         *oinfo->oi_oa = *saved_oa;
1847                 } else if (page_count > pages_per_brw) {
1848                         /* save a copy of oa (brw will clobber it) */
1849                         OBDO_ALLOC(saved_oa);
1850                         if (saved_oa == NULL)
1851                                 GOTO(out, rc = -ENOMEM);
1852                         *saved_oa = *oinfo->oi_oa;
1853                 }
1854
1855                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1856                                       pages_per_brw, ppga, oinfo->oi_capa);
1857
1858                 if (rc != 0)
1859                         break;
1860
1861                 page_count -= pages_per_brw;
1862                 ppga += pages_per_brw;
1863         }
1864
1865 out:
1866         osc_release_ppga(orig, page_count_orig);
1867
1868         if (saved_oa != NULL)
1869                 OBDO_FREE(saved_oa);
1870
1871         RETURN(rc);
1872 }
1873
1874 static int brw_interpret(const struct lu_env *env,
1875                          struct ptlrpc_request *req, void *data, int rc)
1876 {
1877         struct osc_brw_async_args *aa = data;
1878         struct osc_extent *ext;
1879         struct osc_extent *tmp;
1880         struct cl_object  *obj = NULL;
1881         struct client_obd *cli = aa->aa_cli;
1882         ENTRY;
1883
1884         rc = osc_brw_fini_request(req, rc);
1885         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1886         /* When server return -EINPROGRESS, client should always retry
1887          * regardless of the number of times the bulk was resent already. */
1888         if (osc_recoverable_error(rc)) {
1889                 if (req->rq_import_generation !=
1890                     req->rq_import->imp_generation) {
1891                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1892                                ""LPU64":"LPU64", rc = %d.\n",
1893                                req->rq_import->imp_obd->obd_name,
1894                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1895                 } else if (rc == -EINPROGRESS ||
1896                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1897                         rc = osc_brw_redo_request(req, aa);
1898                 } else {
1899                         CERROR("%s: too many resent retries for object: "
1900                                ""LPU64":"LPU64", rc = %d.\n",
1901                                req->rq_import->imp_obd->obd_name,
1902                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1903                 }
1904
1905                 if (rc == 0)
1906                         RETURN(0);
1907                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1908                         rc = -EIO;
1909         }
1910
1911         if (aa->aa_ocapa) {
1912                 capa_put(aa->aa_ocapa);
1913                 aa->aa_ocapa = NULL;
1914         }
1915
1916         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1917                 if (obj == NULL && rc == 0) {
1918                         obj = osc2cl(ext->oe_obj);
1919                         cl_object_get(obj);
1920                 }
1921
1922                 cfs_list_del_init(&ext->oe_link);
1923                 osc_extent_finish(env, ext, 1, rc);
1924         }
1925         LASSERT(cfs_list_empty(&aa->aa_exts));
1926         LASSERT(cfs_list_empty(&aa->aa_oaps));
1927
1928         if (obj != NULL) {
1929                 struct obdo *oa = aa->aa_oa;
1930                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1931                 unsigned long valid = 0;
1932
1933                 LASSERT(rc == 0);
1934                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1935                         attr->cat_blocks = oa->o_blocks;
1936                         valid |= CAT_BLOCKS;
1937                 }
1938                 if (oa->o_valid & OBD_MD_FLMTIME) {
1939                         attr->cat_mtime = oa->o_mtime;
1940                         valid |= CAT_MTIME;
1941                 }
1942                 if (oa->o_valid & OBD_MD_FLATIME) {
1943                         attr->cat_atime = oa->o_atime;
1944                         valid |= CAT_ATIME;
1945                 }
1946                 if (oa->o_valid & OBD_MD_FLCTIME) {
1947                         attr->cat_ctime = oa->o_ctime;
1948                         valid |= CAT_CTIME;
1949                 }
1950                 if (valid != 0) {
1951                         cl_object_attr_lock(obj);
1952                         cl_object_attr_set(env, obj, attr, valid);
1953                         cl_object_attr_unlock(obj);
1954                 }
1955                 cl_object_put(env, obj);
1956         }
1957         OBDO_FREE(aa->aa_oa);
1958
1959         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1960                           req->rq_bulk->bd_nob_transferred);
1961         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1962         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1963
1964         client_obd_list_lock(&cli->cl_loi_list_lock);
1965         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1966          * is called so we know whether to go to sync BRWs or wait for more
1967          * RPCs to complete */
1968         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1969                 cli->cl_w_in_flight--;
1970         else
1971                 cli->cl_r_in_flight--;
1972         osc_wake_cache_waiters(cli);
1973         client_obd_list_unlock(&cli->cl_loi_list_lock);
1974
1975         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1976         RETURN(rc);
1977 }
1978
1979 /**
1980  * Build an RPC by the list of extent @ext_list. The caller must ensure
1981  * that the total pages in this list are NOT over max pages per RPC.
1982  * Extents in the list must be in OES_RPC state.
1983  */
1984 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1985                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
1986 {
1987         struct ptlrpc_request *req = NULL;
1988         struct osc_extent *ext;
1989         CFS_LIST_HEAD(rpc_list);
1990         struct brw_page **pga = NULL;
1991         struct osc_brw_async_args *aa = NULL;
1992         struct obdo *oa = NULL;
1993         struct osc_async_page *oap;
1994         struct osc_async_page *tmp;
1995         struct cl_req *clerq = NULL;
1996         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1997         struct ldlm_lock *lock = NULL;
1998         struct cl_req_attr crattr;
1999         obd_off starting_offset = OBD_OBJECT_EOF;
2000         obd_off ending_offset = 0;
2001         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2002
2003         ENTRY;
2004         LASSERT(!cfs_list_empty(ext_list));
2005
2006         /* add pages into rpc_list to build BRW rpc */
2007         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2008                 LASSERT(ext->oe_state == OES_RPC);
2009                 mem_tight |= ext->oe_memalloc;
2010                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2011                         ++page_count;
2012                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2013                         if (starting_offset > oap->oap_obj_off)
2014                                 starting_offset = oap->oap_obj_off;
2015                         else
2016                                 LASSERT(oap->oap_page_off == 0);
2017                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2018                                 ending_offset = oap->oap_obj_off +
2019                                                 oap->oap_count;
2020                         else
2021                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2022                                         CFS_PAGE_SIZE);
2023                 }
2024         }
2025
2026         if (mem_tight)
2027                 mpflag = cfs_memory_pressure_get_and_set();
2028
2029         memset(&crattr, 0, sizeof crattr);
2030         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2031         if (pga == NULL)
2032                 GOTO(out, rc = -ENOMEM);
2033
2034         OBDO_ALLOC(oa);
2035         if (oa == NULL)
2036                 GOTO(out, rc = -ENOMEM);
2037
2038         i = 0;
2039         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2040                 struct cl_page *page = oap2cl_page(oap);
2041                 if (clerq == NULL) {
2042                         clerq = cl_req_alloc(env, page, crt,
2043                                              1 /* only 1-object rpcs for
2044                                                 * now */);
2045                         if (IS_ERR(clerq))
2046                                 GOTO(out, rc = PTR_ERR(clerq));
2047                         lock = oap->oap_ldlm_lock;
2048                 }
2049                 if (mem_tight)
2050                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2051                 pga[i] = &oap->oap_brw_page;
2052                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2053                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2054                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2055                 i++;
2056                 cl_req_page_add(env, clerq, page);
2057         }
2058
2059         /* always get the data for the obdo for the rpc */
2060         LASSERT(clerq != NULL);
2061         crattr.cra_oa = oa;
2062         crattr.cra_capa = NULL;
2063         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2064         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2065         if (lock) {
2066                 oa->o_handle = lock->l_remote_handle;
2067                 oa->o_valid |= OBD_MD_FLHANDLE;
2068         }
2069
2070         rc = cl_req_prep(env, clerq);
2071         if (rc != 0) {
2072                 CERROR("cl_req_prep failed: %d\n", rc);
2073                 GOTO(out, rc);
2074         }
2075
2076         sort_brw_pages(pga, page_count);
2077         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2078                         pga, &req, crattr.cra_capa, 1, 0);
2079         if (rc != 0) {
2080                 CERROR("prep_req failed: %d\n", rc);
2081                 GOTO(out, rc);
2082         }
2083
2084         req->rq_interpret_reply = brw_interpret;
2085         if (mem_tight != 0)
2086                 req->rq_memalloc = 1;
2087
2088         /* Need to update the timestamps after the request is built in case
2089          * we race with setattr (locally or in queue at OST).  If OST gets
2090          * later setattr before earlier BRW (as determined by the request xid),
2091          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2092          * way to do this in a single call.  bug 10150 */
2093         cl_req_attr_set(env, clerq, &crattr,
2094                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2095
2096         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2097
2098         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2099         aa = ptlrpc_req_async_args(req);
2100         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2101         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2102         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2103         cfs_list_splice_init(ext_list, &aa->aa_exts);
2104         aa->aa_clerq = clerq;
2105
2106         /* queued sync pages can be torn down while the pages
2107          * were between the pending list and the rpc */
2108         tmp = NULL;
2109         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2110                 /* only one oap gets a request reference */
2111                 if (tmp == NULL)
2112                         tmp = oap;
2113                 if (oap->oap_interrupted && !req->rq_intr) {
2114                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2115                                         oap, req);
2116                         ptlrpc_mark_interrupted(req);
2117                 }
2118         }
2119         if (tmp != NULL)
2120                 tmp->oap_request = ptlrpc_request_addref(req);
2121
2122         client_obd_list_lock(&cli->cl_loi_list_lock);
2123         starting_offset >>= CFS_PAGE_SHIFT;
2124         if (cmd == OBD_BRW_READ) {
2125                 cli->cl_r_in_flight++;
2126                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2127                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2128                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2129                                       starting_offset + 1);
2130         } else {
2131                 cli->cl_w_in_flight++;
2132                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2133                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2134                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2135                                       starting_offset + 1);
2136         }
2137         client_obd_list_unlock(&cli->cl_loi_list_lock);
2138
2139         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2140                   page_count, aa, cli->cl_r_in_flight,
2141                   cli->cl_w_in_flight);
2142
2143         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2144          * see which CPU/NUMA node the majority of pages were allocated
2145          * on, and try to assign the async RPC to the CPU core
2146          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2147          *
2148          * But on the other hand, we expect that multiple ptlrpcd
2149          * threads and the initial write sponsor can run in parallel,
2150          * especially when data checksum is enabled, which is CPU-bound
2151          * operation and single ptlrpcd thread cannot process in time.
2152          * So more ptlrpcd threads sharing BRW load
2153          * (with PDL_POLICY_ROUND) seems better.
2154          */
2155         ptlrpcd_add_req(req, pol, -1);
2156         rc = 0;
2157         EXIT;
2158
2159 out:
2160         if (mem_tight != 0)
2161                 cfs_memory_pressure_restore(mpflag);
2162
2163         capa_put(crattr.cra_capa);
2164         if (rc != 0) {
2165                 LASSERT(req == NULL);
2166
2167                 if (oa)
2168                         OBDO_FREE(oa);
2169                 if (pga)
2170                         OBD_FREE(pga, sizeof(*pga) * page_count);
2171                 /* this should happen rarely and is pretty bad, it makes the
2172                  * pending list not follow the dirty order */
2173                 while (!cfs_list_empty(ext_list)) {
2174                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2175                                              oe_link);
2176                         cfs_list_del_init(&ext->oe_link);
2177                         osc_extent_finish(env, ext, 0, rc);
2178                 }
2179                 if (clerq && !IS_ERR(clerq))
2180                         cl_req_completion(env, clerq, rc);
2181         }
2182         RETURN(rc);
2183 }
2184
2185 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2186                                         struct ldlm_enqueue_info *einfo)
2187 {
2188         void *data = einfo->ei_cbdata;
2189         int set = 0;
2190
2191         LASSERT(lock != NULL);
2192         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2193         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2194         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2195         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2196
2197         lock_res_and_lock(lock);
2198         cfs_spin_lock(&osc_ast_guard);
2199
2200         if (lock->l_ast_data == NULL)
2201                 lock->l_ast_data = data;
2202         if (lock->l_ast_data == data)
2203                 set = 1;
2204
2205         cfs_spin_unlock(&osc_ast_guard);
2206         unlock_res_and_lock(lock);
2207
2208         return set;
2209 }
2210
2211 static int osc_set_data_with_check(struct lustre_handle *lockh,
2212                                    struct ldlm_enqueue_info *einfo)
2213 {
2214         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2215         int set = 0;
2216
2217         if (lock != NULL) {
2218                 set = osc_set_lock_data_with_check(lock, einfo);
2219                 LDLM_LOCK_PUT(lock);
2220         } else
2221                 CERROR("lockh %p, data %p - client evicted?\n",
2222                        lockh, einfo->ei_cbdata);
2223         return set;
2224 }
2225
2226 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2227                              ldlm_iterator_t replace, void *data)
2228 {
2229         struct ldlm_res_id res_id;
2230         struct obd_device *obd = class_exp2obd(exp);
2231
2232         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2233         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2234         return 0;
2235 }
2236
2237 /* find any ldlm lock of the inode in osc
2238  * return 0    not find
2239  *        1    find one
2240  *      < 0    error */
2241 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2242                            ldlm_iterator_t replace, void *data)
2243 {
2244         struct ldlm_res_id res_id;
2245         struct obd_device *obd = class_exp2obd(exp);
2246         int rc = 0;
2247
2248         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2249         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2250         if (rc == LDLM_ITER_STOP)
2251                 return(1);
2252         if (rc == LDLM_ITER_CONTINUE)
2253                 return(0);
2254         return(rc);
2255 }
2256
2257 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2258                             obd_enqueue_update_f upcall, void *cookie,
2259                             int *flags, int agl, int rc)
2260 {
2261         int intent = *flags & LDLM_FL_HAS_INTENT;
2262         ENTRY;
2263
2264         if (intent) {
2265                 /* The request was created before ldlm_cli_enqueue call. */
2266                 if (rc == ELDLM_LOCK_ABORTED) {
2267                         struct ldlm_reply *rep;
2268                         rep = req_capsule_server_get(&req->rq_pill,
2269                                                      &RMF_DLM_REP);
2270
2271                         LASSERT(rep != NULL);
2272                         if (rep->lock_policy_res1)
2273                                 rc = rep->lock_policy_res1;
2274                 }
2275         }
2276
2277         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2278             (rc == 0)) {
2279                 *flags |= LDLM_FL_LVB_READY;
2280                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2281                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2282         }
2283
2284         /* Call the update callback. */
2285         rc = (*upcall)(cookie, rc);
2286         RETURN(rc);
2287 }
2288
2289 static int osc_enqueue_interpret(const struct lu_env *env,
2290                                  struct ptlrpc_request *req,
2291                                  struct osc_enqueue_args *aa, int rc)
2292 {
2293         struct ldlm_lock *lock;
2294         struct lustre_handle handle;
2295         __u32 mode;
2296         struct ost_lvb *lvb;
2297         __u32 lvb_len;
2298         int *flags = aa->oa_flags;
2299
2300         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2301          * might be freed anytime after lock upcall has been called. */
2302         lustre_handle_copy(&handle, aa->oa_lockh);
2303         mode = aa->oa_ei->ei_mode;
2304
2305         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2306          * be valid. */
2307         lock = ldlm_handle2lock(&handle);
2308
2309         /* Take an additional reference so that a blocking AST that
2310          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2311          * to arrive after an upcall has been executed by
2312          * osc_enqueue_fini(). */
2313         ldlm_lock_addref(&handle, mode);
2314
2315         /* Let CP AST to grant the lock first. */
2316         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2317
2318         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2319                 lvb = NULL;
2320                 lvb_len = 0;
2321         } else {
2322                 lvb = aa->oa_lvb;
2323                 lvb_len = sizeof(*aa->oa_lvb);
2324         }
2325
2326         /* Complete obtaining the lock procedure. */
2327         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2328                                    mode, flags, lvb, lvb_len, &handle, rc);
2329         /* Complete osc stuff. */
2330         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2331                               flags, aa->oa_agl, rc);
2332
2333         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2334
2335         /* Release the lock for async request. */
2336         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2337                 /*
2338                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2339                  * not already released by
2340                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2341                  */
2342                 ldlm_lock_decref(&handle, mode);
2343
2344         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2345                  aa->oa_lockh, req, aa);
2346         ldlm_lock_decref(&handle, mode);
2347         LDLM_LOCK_PUT(lock);
2348         return rc;
2349 }
2350
2351 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2352                         struct lov_oinfo *loi, int flags,
2353                         struct ost_lvb *lvb, __u32 mode, int rc)
2354 {
2355         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2356
2357         if (rc == ELDLM_OK) {
2358                 __u64 tmp;
2359
2360                 LASSERT(lock != NULL);
2361                 loi->loi_lvb = *lvb;
2362                 tmp = loi->loi_lvb.lvb_size;
2363                 /* Extend KMS up to the end of this lock and no further
2364                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2365                 if (tmp > lock->l_policy_data.l_extent.end)
2366                         tmp = lock->l_policy_data.l_extent.end + 1;
2367                 if (tmp >= loi->loi_kms) {
2368                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2369                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2370                         loi_kms_set(loi, tmp);
2371                 } else {
2372                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2373                                    LPU64"; leaving kms="LPU64", end="LPU64,
2374                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2375                                    lock->l_policy_data.l_extent.end);
2376                 }
2377                 ldlm_lock_allow_match(lock);
2378         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2379                 LASSERT(lock != NULL);
2380                 loi->loi_lvb = *lvb;
2381                 ldlm_lock_allow_match(lock);
2382                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2383                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2384                 rc = ELDLM_OK;
2385         }
2386
2387         if (lock != NULL) {
2388                 if (rc != ELDLM_OK)
2389                         ldlm_lock_fail_match(lock);
2390
2391                 LDLM_LOCK_PUT(lock);
2392         }
2393 }
2394 EXPORT_SYMBOL(osc_update_enqueue);
2395
2396 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2397
2398 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2399  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2400  * other synchronous requests, however keeping some locks and trying to obtain
2401  * others may take a considerable amount of time in a case of ost failure; and
2402  * when other sync requests do not get released lock from a client, the client
2403  * is excluded from the cluster -- such scenarious make the life difficult, so
2404  * release locks just after they are obtained. */
2405 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2406                      int *flags, ldlm_policy_data_t *policy,
2407                      struct ost_lvb *lvb, int kms_valid,
2408                      obd_enqueue_update_f upcall, void *cookie,
2409                      struct ldlm_enqueue_info *einfo,
2410                      struct lustre_handle *lockh,
2411                      struct ptlrpc_request_set *rqset, int async, int agl)
2412 {
2413         struct obd_device *obd = exp->exp_obd;
2414         struct ptlrpc_request *req = NULL;
2415         int intent = *flags & LDLM_FL_HAS_INTENT;
2416         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2417         ldlm_mode_t mode;
2418         int rc;
2419         ENTRY;
2420
2421         /* Filesystem lock extents are extended to page boundaries so that
2422          * dealing with the page cache is a little smoother.  */
2423         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2424         policy->l_extent.end |= ~CFS_PAGE_MASK;
2425
2426         /*
2427          * kms is not valid when either object is completely fresh (so that no
2428          * locks are cached), or object was evicted. In the latter case cached
2429          * lock cannot be used, because it would prime inode state with
2430          * potentially stale LVB.
2431          */
2432         if (!kms_valid)
2433                 goto no_match;
2434
2435         /* Next, search for already existing extent locks that will cover us */
2436         /* If we're trying to read, we also search for an existing PW lock.  The
2437          * VFS and page cache already protect us locally, so lots of readers/
2438          * writers can share a single PW lock.
2439          *
2440          * There are problems with conversion deadlocks, so instead of
2441          * converting a read lock to a write lock, we'll just enqueue a new
2442          * one.
2443          *
2444          * At some point we should cancel the read lock instead of making them
2445          * send us a blocking callback, but there are problems with canceling
2446          * locks out from other users right now, too. */
2447         mode = einfo->ei_mode;
2448         if (einfo->ei_mode == LCK_PR)
2449                 mode |= LCK_PW;
2450         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2451                                einfo->ei_type, policy, mode, lockh, 0);
2452         if (mode) {
2453                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2454
2455                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2456                         /* For AGL, if enqueue RPC is sent but the lock is not
2457                          * granted, then skip to process this strpe.
2458                          * Return -ECANCELED to tell the caller. */
2459                         ldlm_lock_decref(lockh, mode);
2460                         LDLM_LOCK_PUT(matched);
2461                         RETURN(-ECANCELED);
2462                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2463                         *flags |= LDLM_FL_LVB_READY;
2464                         /* addref the lock only if not async requests and PW
2465                          * lock is matched whereas we asked for PR. */
2466                         if (!rqset && einfo->ei_mode != mode)
2467                                 ldlm_lock_addref(lockh, LCK_PR);
2468                         if (intent) {
2469                                 /* I would like to be able to ASSERT here that
2470                                  * rss <= kms, but I can't, for reasons which
2471                                  * are explained in lov_enqueue() */
2472                         }
2473
2474                         /* We already have a lock, and it's referenced */
2475                         (*upcall)(cookie, ELDLM_OK);
2476
2477                         if (einfo->ei_mode != mode)
2478                                 ldlm_lock_decref(lockh, LCK_PW);
2479                         else if (rqset)
2480                                 /* For async requests, decref the lock. */
2481                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2482                         LDLM_LOCK_PUT(matched);
2483                         RETURN(ELDLM_OK);
2484                 } else {
2485                         ldlm_lock_decref(lockh, mode);
2486                         LDLM_LOCK_PUT(matched);
2487                 }
2488         }
2489
2490  no_match:
2491         if (intent) {
2492                 CFS_LIST_HEAD(cancels);
2493                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2494                                            &RQF_LDLM_ENQUEUE_LVB);
2495                 if (req == NULL)
2496                         RETURN(-ENOMEM);
2497
2498                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2499                 if (rc) {
2500                         ptlrpc_request_free(req);
2501                         RETURN(rc);
2502                 }
2503
2504                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2505                                      sizeof *lvb);
2506                 ptlrpc_request_set_replen(req);
2507         }
2508
2509         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2510         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2511
2512         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2513                               sizeof(*lvb), lockh, async);
2514         if (rqset) {
2515                 if (!rc) {
2516                         struct osc_enqueue_args *aa;
2517                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2518                         aa = ptlrpc_req_async_args(req);
2519                         aa->oa_ei = einfo;
2520                         aa->oa_exp = exp;
2521                         aa->oa_flags  = flags;
2522                         aa->oa_upcall = upcall;
2523                         aa->oa_cookie = cookie;
2524                         aa->oa_lvb    = lvb;
2525                         aa->oa_lockh  = lockh;
2526                         aa->oa_agl    = !!agl;
2527
2528                         req->rq_interpret_reply =
2529                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2530                         if (rqset == PTLRPCD_SET)
2531                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2532                         else
2533                                 ptlrpc_set_add_req(rqset, req);
2534                 } else if (intent) {
2535                         ptlrpc_req_finished(req);
2536                 }
2537                 RETURN(rc);
2538         }
2539
2540         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2541         if (intent)
2542                 ptlrpc_req_finished(req);
2543
2544         RETURN(rc);
2545 }
2546
2547 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2548                        struct ldlm_enqueue_info *einfo,
2549                        struct ptlrpc_request_set *rqset)
2550 {
2551         struct ldlm_res_id res_id;
2552         int rc;
2553         ENTRY;
2554
2555         osc_build_res_name(oinfo->oi_md->lsm_object_id,
2556                            oinfo->oi_md->lsm_object_seq, &res_id);
2557
2558         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2559                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2560                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2561                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2562                               rqset, rqset != NULL, 0);
2563         RETURN(rc);
2564 }
2565
2566 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2567                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2568                    int *flags, void *data, struct lustre_handle *lockh,
2569                    int unref)
2570 {
2571         struct obd_device *obd = exp->exp_obd;
2572         int lflags = *flags;
2573         ldlm_mode_t rc;
2574         ENTRY;
2575
2576         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2577                 RETURN(-EIO);
2578
2579         /* Filesystem lock extents are extended to page boundaries so that
2580          * dealing with the page cache is a little smoother */
2581         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2582         policy->l_extent.end |= ~CFS_PAGE_MASK;
2583
2584         /* Next, search for already existing extent locks that will cover us */
2585         /* If we're trying to read, we also search for an existing PW lock.  The
2586          * VFS and page cache already protect us locally, so lots of readers/
2587          * writers can share a single PW lock. */
2588         rc = mode;
2589         if (mode == LCK_PR)
2590                 rc |= LCK_PW;
2591         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2592                              res_id, type, policy, rc, lockh, unref);
2593         if (rc) {
2594                 if (data != NULL) {
2595                         if (!osc_set_data_with_check(lockh, data)) {
2596                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2597                                         ldlm_lock_decref(lockh, rc);
2598                                 RETURN(0);
2599                         }
2600                 }
2601                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2602                         ldlm_lock_addref(lockh, LCK_PR);
2603                         ldlm_lock_decref(lockh, LCK_PW);
2604                 }
2605                 RETURN(rc);
2606         }
2607         RETURN(rc);
2608 }
2609
2610 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2611 {
2612         ENTRY;
2613
2614         if (unlikely(mode == LCK_GROUP))
2615                 ldlm_lock_decref_and_cancel(lockh, mode);
2616         else
2617                 ldlm_lock_decref(lockh, mode);
2618
2619         RETURN(0);
2620 }
2621
2622 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2623                       __u32 mode, struct lustre_handle *lockh)
2624 {
2625         ENTRY;
2626         RETURN(osc_cancel_base(lockh, mode));
2627 }
2628
2629 static int osc_cancel_unused(struct obd_export *exp,
2630                              struct lov_stripe_md *lsm,
2631                              ldlm_cancel_flags_t flags,
2632                              void *opaque)
2633 {
2634         struct obd_device *obd = class_exp2obd(exp);
2635         struct ldlm_res_id res_id, *resp = NULL;
2636
2637         if (lsm != NULL) {
2638                 resp = osc_build_res_name(lsm->lsm_object_id,
2639                                           lsm->lsm_object_seq, &res_id);
2640         }
2641
2642         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2643 }
2644
2645 static int osc_statfs_interpret(const struct lu_env *env,
2646                                 struct ptlrpc_request *req,
2647                                 struct osc_async_args *aa, int rc)
2648 {
2649         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
2650         struct obd_statfs *msfs;
2651         __u64 used;
2652         ENTRY;
2653
2654         if (rc == -EBADR)
2655                 /* The request has in fact never been sent
2656                  * due to issues at a higher level (LOV).
2657                  * Exit immediately since the caller is
2658                  * aware of the problem and takes care
2659                  * of the clean up */
2660                  RETURN(rc);
2661
2662         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2663             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2664                 GOTO(out, rc = 0);
2665
2666         if (rc != 0)
2667                 GOTO(out, rc);
2668
2669         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2670         if (msfs == NULL) {
2671                 GOTO(out, rc = -EPROTO);
2672         }
2673
2674         /* Reinitialize the RDONLY and DEGRADED flags at the client
2675          * on each statfs, so they don't stay set permanently. */
2676         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
2677
2678         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
2679                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
2680         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
2681                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
2682
2683         if (unlikely(msfs->os_state & OS_STATE_READONLY))
2684                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
2685         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
2686                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
2687
2688         /* Add a bit of hysteresis so this flag isn't continually flapping,
2689          * and ensure that new files don't get extremely fragmented due to
2690          * only a small amount of available space in the filesystem.
2691          * We want to set the NOSPC flag when there is less than ~0.1% free
2692          * and clear it when there is at least ~0.2% free space, so:
2693          *                   avail < ~0.1% max          max = avail + used
2694          *            1025 * avail < avail + used       used = blocks - free
2695          *            1024 * avail < used
2696          *            1024 * avail < blocks - free
2697          *                   avail < ((blocks - free) >> 10)
2698          *
2699          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
2700          * lose that amount of space so in those cases we report no space left
2701          * if their is less than 1 GB left.                             */
2702         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
2703         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
2704                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
2705                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
2706         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2707                           (msfs->os_ffree > 64) &&
2708                           (msfs->os_bavail > (used << 1)))) {
2709                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
2710                                              OSCC_FLAG_NOSPC_BLK);
2711         }
2712
2713         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2714                      (msfs->os_bavail < used)))
2715                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
2716
2717         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
2718
2719         *aa->aa_oi->oi_osfs = *msfs;
2720 out:
2721         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2722         RETURN(rc);
2723 }
2724
2725 static int osc_statfs_async(struct obd_export *exp,
2726                             struct obd_info *oinfo, __u64 max_age,
2727                             struct ptlrpc_request_set *rqset)
2728 {
2729         struct obd_device     *obd = class_exp2obd(exp);
2730         struct ptlrpc_request *req;
2731         struct osc_async_args *aa;
2732         int                    rc;
2733         ENTRY;
2734
2735         /* We could possibly pass max_age in the request (as an absolute
2736          * timestamp or a "seconds.usec ago") so the target can avoid doing
2737          * extra calls into the filesystem if that isn't necessary (e.g.
2738          * during mount that would help a bit).  Having relative timestamps
2739          * is not so great if request processing is slow, while absolute
2740          * timestamps are not ideal because they need time synchronization. */
2741         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2742         if (req == NULL)
2743                 RETURN(-ENOMEM);
2744
2745         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2746         if (rc) {
2747                 ptlrpc_request_free(req);
2748                 RETURN(rc);
2749         }
2750         ptlrpc_request_set_replen(req);
2751         req->rq_request_portal = OST_CREATE_PORTAL;
2752         ptlrpc_at_set_req_timeout(req);
2753
2754         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2755                 /* procfs requests not want stat in wait for avoid deadlock */
2756                 req->rq_no_resend = 1;
2757                 req->rq_no_delay = 1;
2758         }
2759
2760         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2761         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2762         aa = ptlrpc_req_async_args(req);
2763         aa->aa_oi = oinfo;
2764
2765         ptlrpc_set_add_req(rqset, req);
2766         RETURN(0);
2767 }
2768
2769 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2770                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2771 {
2772         struct obd_device     *obd = class_exp2obd(exp);
2773         struct obd_statfs     *msfs;
2774         struct ptlrpc_request *req;
2775         struct obd_import     *imp = NULL;
2776         int rc;
2777         ENTRY;
2778
2779         /*Since the request might also come from lprocfs, so we need
2780          *sync this with client_disconnect_export Bug15684*/
2781         cfs_down_read(&obd->u.cli.cl_sem);
2782         if (obd->u.cli.cl_import)
2783                 imp = class_import_get(obd->u.cli.cl_import);
2784         cfs_up_read(&obd->u.cli.cl_sem);
2785         if (!imp)
2786                 RETURN(-ENODEV);
2787
2788         /* We could possibly pass max_age in the request (as an absolute
2789          * timestamp or a "seconds.usec ago") so the target can avoid doing
2790          * extra calls into the filesystem if that isn't necessary (e.g.
2791          * during mount that would help a bit).  Having relative timestamps
2792          * is not so great if request processing is slow, while absolute
2793          * timestamps are not ideal because they need time synchronization. */
2794         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2795
2796         class_import_put(imp);
2797
2798         if (req == NULL)
2799                 RETURN(-ENOMEM);
2800
2801         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2802         if (rc) {
2803                 ptlrpc_request_free(req);
2804                 RETURN(rc);
2805         }
2806         ptlrpc_request_set_replen(req);
2807         req->rq_request_portal = OST_CREATE_PORTAL;
2808         ptlrpc_at_set_req_timeout(req);
2809
2810         if (flags & OBD_STATFS_NODELAY) {
2811                 /* procfs requests not want stat in wait for avoid deadlock */
2812                 req->rq_no_resend = 1;
2813                 req->rq_no_delay = 1;
2814         }
2815
2816         rc = ptlrpc_queue_wait(req);
2817         if (rc)
2818                 GOTO(out, rc);
2819
2820         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2821         if (msfs == NULL) {
2822                 GOTO(out, rc = -EPROTO);
2823         }
2824
2825         *osfs = *msfs;
2826
2827         EXIT;
2828  out:
2829         ptlrpc_req_finished(req);
2830         return rc;
2831 }
2832
2833 /* Retrieve object striping information.
2834  *
2835  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2836  * the maximum number of OST indices which will fit in the user buffer.
2837  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2838  */
2839 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2840 {
2841         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2842         struct lov_user_md_v3 lum, *lumk;
2843         struct lov_user_ost_data_v1 *lmm_objects;
2844         int rc = 0, lum_size;
2845         ENTRY;
2846
2847         if (!lsm)
2848                 RETURN(-ENODATA);
2849
2850         /* we only need the header part from user space to get lmm_magic and
2851          * lmm_stripe_count, (the header part is common to v1 and v3) */
2852         lum_size = sizeof(struct lov_user_md_v1);
2853         if (cfs_copy_from_user(&lum, lump, lum_size))
2854                 RETURN(-EFAULT);
2855
2856         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2857             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2858                 RETURN(-EINVAL);
2859
2860         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2861         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2862         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2863         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2864
2865         /* we can use lov_mds_md_size() to compute lum_size
2866          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2867         if (lum.lmm_stripe_count > 0) {
2868                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2869                 OBD_ALLOC(lumk, lum_size);
2870                 if (!lumk)
2871                         RETURN(-ENOMEM);
2872
2873                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2874                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2875                 else
2876                         lmm_objects = &(lumk->lmm_objects[0]);
2877                 lmm_objects->l_object_id = lsm->lsm_object_id;
2878         } else {
2879                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2880                 lumk = &lum;
2881         }
2882
2883         lumk->lmm_object_id = lsm->lsm_object_id;
2884         lumk->lmm_object_seq = lsm->lsm_object_seq;
2885         lumk->lmm_stripe_count = 1;
2886
2887         if (cfs_copy_to_user(lump, lumk, lum_size))
2888                 rc = -EFAULT;
2889
2890         if (lumk != &lum)
2891                 OBD_FREE(lumk, lum_size);
2892
2893         RETURN(rc);
2894 }
2895
2896
2897 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2898                          void *karg, void *uarg)
2899 {
2900         struct obd_device *obd = exp->exp_obd;
2901         struct obd_ioctl_data *data = karg;
2902         int err = 0;
2903         ENTRY;
2904
2905         if (!cfs_try_module_get(THIS_MODULE)) {
2906                 CERROR("Can't get module. Is it alive?");
2907                 return -EINVAL;
2908         }
2909         switch (cmd) {
2910         case OBD_IOC_LOV_GET_CONFIG: {
2911                 char *buf;
2912                 struct lov_desc *desc;
2913                 struct obd_uuid uuid;
2914
2915                 buf = NULL;
2916                 len = 0;
2917                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2918                         GOTO(out, err = -EINVAL);
2919
2920                 data = (struct obd_ioctl_data *)buf;
2921
2922                 if (sizeof(*desc) > data->ioc_inllen1) {
2923                         obd_ioctl_freedata(buf, len);
2924                         GOTO(out, err = -EINVAL);
2925                 }
2926
2927                 if (data->ioc_inllen2 < sizeof(uuid)) {
2928                         obd_ioctl_freedata(buf, len);
2929                         GOTO(out, err = -EINVAL);
2930                 }
2931
2932                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2933                 desc->ld_tgt_count = 1;
2934                 desc->ld_active_tgt_count = 1;
2935                 desc->ld_default_stripe_count = 1;
2936                 desc->ld_default_stripe_size = 0;
2937                 desc->ld_default_stripe_offset = 0;
2938                 desc->ld_pattern = 0;
2939                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2940
2941                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2942
2943                 err = cfs_copy_to_user((void *)uarg, buf, len);
2944                 if (err)
2945                         err = -EFAULT;
2946                 obd_ioctl_freedata(buf, len);
2947                 GOTO(out, err);
2948         }
2949         case LL_IOC_LOV_SETSTRIPE:
2950                 err = obd_alloc_memmd(exp, karg);
2951                 if (err > 0)
2952                         err = 0;
2953                 GOTO(out, err);
2954         case LL_IOC_LOV_GETSTRIPE:
2955                 err = osc_getstripe(karg, uarg);
2956                 GOTO(out, err);
2957         case OBD_IOC_CLIENT_RECOVER:
2958                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2959                                             data->ioc_inlbuf1, 0);
2960                 if (err > 0)
2961                         err = 0;
2962                 GOTO(out, err);
2963         case IOC_OSC_SET_ACTIVE:
2964                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2965                                                data->ioc_offset);
2966                 GOTO(out, err);
2967         case OBD_IOC_POLL_QUOTACHECK:
2968                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2969                 GOTO(out, err);
2970         case OBD_IOC_PING_TARGET:
2971                 err = ptlrpc_obd_ping(obd);
2972                 GOTO(out, err);
2973         default:
2974                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2975                        cmd, cfs_curproc_comm());
2976                 GOTO(out, err = -ENOTTY);
2977         }
2978 out:
2979         cfs_module_put(THIS_MODULE);
2980         return err;
2981 }
2982
2983 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2984                         obd_count keylen, void *key, __u32 *vallen, void *val,
2985                         struct lov_stripe_md *lsm)
2986 {
2987         ENTRY;
2988         if (!vallen || !val)
2989                 RETURN(-EFAULT);
2990
2991         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2992                 __u32 *stripe = val;
2993                 *vallen = sizeof(*stripe);
2994                 *stripe = 0;
2995                 RETURN(0);
2996         } else if (KEY_IS(KEY_LAST_ID)) {
2997                 struct ptlrpc_request *req;
2998                 obd_id                *reply;
2999                 char                  *tmp;
3000                 int                    rc;
3001
3002                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3003                                            &RQF_OST_GET_INFO_LAST_ID);
3004                 if (req == NULL)
3005                         RETURN(-ENOMEM);
3006
3007                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3008                                      RCL_CLIENT, keylen);
3009                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3010                 if (rc) {
3011                         ptlrpc_request_free(req);
3012                         RETURN(rc);
3013                 }
3014
3015                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3016                 memcpy(tmp, key, keylen);
3017
3018                 req->rq_no_delay = req->rq_no_resend = 1;
3019                 ptlrpc_request_set_replen(req);
3020                 rc = ptlrpc_queue_wait(req);
3021                 if (rc)
3022                         GOTO(out, rc);
3023
3024                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3025                 if (reply == NULL)
3026                         GOTO(out, rc = -EPROTO);
3027
3028                 *((obd_id *)val) = *reply;
3029         out:
3030                 ptlrpc_req_finished(req);
3031                 RETURN(rc);
3032         } else if (KEY_IS(KEY_FIEMAP)) {
3033                 struct ptlrpc_request *req;
3034                 struct ll_user_fiemap *reply;
3035                 char *tmp;
3036                 int rc;
3037
3038                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3039                                            &RQF_OST_GET_INFO_FIEMAP);
3040                 if (req == NULL)
3041                         RETURN(-ENOMEM);
3042
3043                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3044                                      RCL_CLIENT, keylen);
3045                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3046                                      RCL_CLIENT, *vallen);
3047                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3048                                      RCL_SERVER, *vallen);
3049
3050                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3051                 if (rc) {
3052                         ptlrpc_request_free(req);
3053                         RETURN(rc);
3054                 }
3055
3056                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3057                 memcpy(tmp, key, keylen);
3058                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3059                 memcpy(tmp, val, *vallen);
3060
3061                 ptlrpc_request_set_replen(req);
3062                 rc = ptlrpc_queue_wait(req);
3063                 if (rc)
3064                         GOTO(out1, rc);
3065
3066                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3067                 if (reply == NULL)
3068                         GOTO(out1, rc = -EPROTO);
3069
3070                 memcpy(val, reply, *vallen);
3071         out1:
3072                 ptlrpc_req_finished(req);
3073
3074                 RETURN(rc);
3075         }
3076
3077         RETURN(-EINVAL);
3078 }
3079
3080 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3081 {
3082         struct llog_ctxt *ctxt;
3083         int rc = 0;
3084         ENTRY;
3085
3086         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3087         if (ctxt) {
3088                 rc = llog_initiator_connect(ctxt);
3089                 llog_ctxt_put(ctxt);
3090         } else {
3091                 /* XXX return an error? skip setting below flags? */
3092         }
3093
3094         cfs_spin_lock(&imp->imp_lock);
3095         imp->imp_server_timeout = 1;
3096         imp->imp_pingable = 1;
3097         cfs_spin_unlock(&imp->imp_lock);
3098         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3099
3100         RETURN(rc);
3101 }
3102
3103 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3104                                           struct ptlrpc_request *req,
3105                                           void *aa, int rc)
3106 {
3107         ENTRY;
3108         if (rc != 0)
3109                 RETURN(rc);
3110
3111         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3112 }
3113
3114 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3115                               obd_count keylen, void *key, obd_count vallen,
3116                               void *val, struct ptlrpc_request_set *set)
3117 {
3118         struct ptlrpc_request *req;
3119         struct obd_device     *obd = exp->exp_obd;
3120         struct obd_import     *imp = class_exp2cliimp(exp);
3121         char                  *tmp;
3122         int                    rc;
3123         ENTRY;
3124
3125         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3126
3127         if (KEY_IS(KEY_NEXT_ID)) {
3128                 obd_id new_val;
3129                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3130
3131                 if (vallen != sizeof(obd_id))
3132                         RETURN(-ERANGE);
3133                 if (val == NULL)
3134                         RETURN(-EINVAL);
3135
3136                 if (vallen != sizeof(obd_id))
3137                         RETURN(-EINVAL);
3138
3139                 /* avoid race between allocate new object and set next id
3140                  * from ll_sync thread */
3141                 cfs_spin_lock(&oscc->oscc_lock);
3142                 new_val = *((obd_id*)val) + 1;
3143                 if (new_val > oscc->oscc_next_id)
3144                         oscc->oscc_next_id = new_val;
3145                 cfs_spin_unlock(&oscc->oscc_lock);
3146                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3147                        exp->exp_obd->obd_name,
3148                        obd->u.cli.cl_oscc.oscc_next_id);
3149
3150                 RETURN(0);
3151         }
3152
3153         if (KEY_IS(KEY_CHECKSUM)) {
3154                 if (vallen != sizeof(int))
3155                         RETURN(-EINVAL);
3156                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3157                 RETURN(0);
3158         }
3159
3160         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3161                 sptlrpc_conf_client_adapt(obd);
3162                 RETURN(0);
3163         }
3164
3165         if (KEY_IS(KEY_FLUSH_CTX)) {
3166                 sptlrpc_import_flush_my_ctx(imp);
3167                 RETURN(0);
3168         }
3169
3170         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3171                 RETURN(-EINVAL);
3172
3173         /* We pass all other commands directly to OST. Since nobody calls osc
3174            methods directly and everybody is supposed to go through LOV, we
3175            assume lov checked invalid values for us.
3176            The only recognised values so far are evict_by_nid and mds_conn.
3177            Even if something bad goes through, we'd get a -EINVAL from OST
3178            anyway. */
3179
3180         if (KEY_IS(KEY_GRANT_SHRINK))
3181                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3182         else
3183                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3184
3185         if (req == NULL)
3186                 RETURN(-ENOMEM);
3187
3188         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3189                              RCL_CLIENT, keylen);
3190         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3191                              RCL_CLIENT, vallen);
3192         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3193         if (rc) {
3194                 ptlrpc_request_free(req);
3195                 RETURN(rc);
3196         }
3197
3198         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3199         memcpy(tmp, key, keylen);
3200         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3201         memcpy(tmp, val, vallen);
3202
3203         if (KEY_IS(KEY_MDS_CONN)) {
3204                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3205
3206                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
3207                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3208                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
3209                 req->rq_no_delay = req->rq_no_resend = 1;
3210                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3211         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3212                 struct osc_grant_args *aa;
3213                 struct obdo *oa;
3214
3215                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3216                 aa = ptlrpc_req_async_args(req);
3217                 OBDO_ALLOC(oa);
3218                 if (!oa) {
3219                         ptlrpc_req_finished(req);
3220                         RETURN(-ENOMEM);
3221                 }
3222                 *oa = ((struct ost_body *)val)->oa;
3223                 aa->aa_oa = oa;
3224                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3225         }
3226
3227         ptlrpc_request_set_replen(req);
3228         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3229                 LASSERT(set != NULL);
3230                 ptlrpc_set_add_req(set, req);
3231                 ptlrpc_check_set(NULL, set);
3232         } else
3233                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3234
3235         RETURN(0);
3236 }
3237
3238
3239 static struct llog_operations osc_size_repl_logops = {
3240         lop_cancel: llog_obd_repl_cancel
3241 };
3242
3243 static struct llog_operations osc_mds_ost_orig_logops;
3244
3245 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3246                            struct obd_device *tgt, struct llog_catid *catid)
3247 {
3248         int rc;
3249         ENTRY;
3250
3251         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
3252                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3253         if (rc) {
3254                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3255                 GOTO(out, rc);
3256         }
3257
3258         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
3259                         NULL, &osc_size_repl_logops);
3260         if (rc) {
3261                 struct llog_ctxt *ctxt =
3262                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3263                 if (ctxt)
3264                         llog_cleanup(ctxt);
3265                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3266         }
3267         GOTO(out, rc);
3268 out:
3269         if (rc) {
3270                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
3271                        obd->obd_name, tgt->obd_name, catid, rc);
3272                 CERROR("logid "LPX64":0x%x\n",
3273                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3274         }
3275         return rc;
3276 }
3277
3278 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3279                          struct obd_device *disk_obd, int *index)
3280 {
3281         struct llog_catid catid;
3282         static char name[32] = CATLIST;
3283         int rc;
3284         ENTRY;
3285
3286         LASSERT(olg == &obd->obd_olg);
3287
3288         cfs_mutex_lock(&olg->olg_cat_processing);
3289         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
3290         if (rc) {
3291                 CERROR("rc: %d\n", rc);
3292                 GOTO(out, rc);
3293         }
3294
3295         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
3296                obd->obd_name, *index, catid.lci_logid.lgl_oid,
3297                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
3298
3299         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
3300         if (rc) {
3301                 CERROR("rc: %d\n", rc);
3302                 GOTO(out, rc);
3303         }
3304
3305         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
3306         if (rc) {
3307                 CERROR("rc: %d\n", rc);
3308                 GOTO(out, rc);
3309         }
3310
3311  out:
3312         cfs_mutex_unlock(&olg->olg_cat_processing);
3313
3314         return rc;
3315 }
3316
3317 static int osc_llog_finish(struct obd_device *obd, int count)
3318 {
3319         struct llog_ctxt *ctxt;
3320         int rc = 0, rc2 = 0;
3321         ENTRY;
3322
3323         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3324         if (ctxt)
3325                 rc = llog_cleanup(ctxt);
3326
3327         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3328         if (ctxt)
3329                 rc2 = llog_cleanup(ctxt);
3330         if (!rc)
3331                 rc = rc2;
3332
3333         RETURN(rc);
3334 }
3335
3336 static int osc_reconnect(const struct lu_env *env,
3337                          struct obd_export *exp, struct obd_device *obd,
3338                          struct obd_uuid *cluuid,
3339                          struct obd_connect_data *data,
3340                          void *localdata)
3341 {
3342         struct client_obd *cli = &obd->u.cli;
3343
3344         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3345                 long lost_grant;
3346
3347                 client_obd_list_lock(&cli->cl_loi_list_lock);
3348                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3349                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3350                 lost_grant = cli->cl_lost_grant;
3351                 cli->cl_lost_grant = 0;
3352                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3353
3354                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3355                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3356                        data->ocd_version, data->ocd_grant, lost_grant);
3357         }
3358
3359         RETURN(0);
3360 }
3361
3362 static int osc_disconnect(struct obd_export *exp)
3363 {
3364         struct obd_device *obd = class_exp2obd(exp);
3365         struct llog_ctxt  *ctxt;
3366         int rc;
3367
3368         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3369         if (ctxt) {
3370                 if (obd->u.cli.cl_conn_count == 1) {
3371                         /* Flush any remaining cancel messages out to the
3372                          * target */
3373                         llog_sync(ctxt, exp, 0);
3374                 }
3375                 llog_ctxt_put(ctxt);
3376         } else {
3377                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3378                        obd);
3379         }
3380
3381         rc = client_disconnect_export(exp);
3382         /**
3383          * Initially we put del_shrink_grant before disconnect_export, but it
3384          * causes the following problem if setup (connect) and cleanup
3385          * (disconnect) are tangled together.
3386          *      connect p1                     disconnect p2
3387          *   ptlrpc_connect_import
3388          *     ...............               class_manual_cleanup
3389          *                                     osc_disconnect
3390          *                                     del_shrink_grant
3391          *   ptlrpc_connect_interrupt
3392          *     init_grant_shrink
3393          *   add this client to shrink list
3394          *                                      cleanup_osc
3395          * Bang! pinger trigger the shrink.
3396          * So the osc should be disconnected from the shrink list, after we
3397          * are sure the import has been destroyed. BUG18662
3398          */
3399         if (obd->u.cli.cl_import == NULL)
3400                 osc_del_shrink_grant(&obd->u.cli);
3401         return rc;
3402 }
3403
3404 static int osc_import_event(struct obd_device *obd,
3405                             struct obd_import *imp,
3406                             enum obd_import_event event)
3407 {
3408         struct client_obd *cli;
3409         int rc = 0;
3410
3411         ENTRY;
3412         LASSERT(imp->imp_obd == obd);
3413
3414         switch (event) {
3415         case IMP_EVENT_DISCON: {
3416                 /* Only do this on the MDS OSC's */
3417                 if (imp->imp_server_timeout) {
3418                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3419
3420                         cfs_spin_lock(&oscc->oscc_lock);
3421                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3422                         cfs_spin_unlock(&oscc->oscc_lock);
3423                 }
3424                 cli = &obd->u.cli;
3425                 client_obd_list_lock(&cli->cl_loi_list_lock);
3426                 cli->cl_avail_grant = 0;
3427                 cli->cl_lost_grant = 0;
3428                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3429                 break;
3430         }
3431         case IMP_EVENT_INACTIVE: {
3432                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3433                 break;
3434         }
3435         case IMP_EVENT_INVALIDATE: {
3436                 struct ldlm_namespace *ns = obd->obd_namespace;
3437                 struct lu_env         *env;
3438                 int                    refcheck;
3439
3440                 env = cl_env_get(&refcheck);
3441                 if (!IS_ERR(env)) {
3442                         /* Reset grants */
3443                         cli = &obd->u.cli;
3444                         /* all pages go to failing rpcs due to the invalid
3445                          * import */
3446                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3447
3448                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3449                         cl_env_put(env, &refcheck);
3450                 } else
3451                         rc = PTR_ERR(env);
3452                 break;
3453         }
3454         case IMP_EVENT_ACTIVE: {
3455                 /* Only do this on the MDS OSC's */
3456                 if (imp->imp_server_timeout) {
3457                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3458
3459                         cfs_spin_lock(&oscc->oscc_lock);
3460                         oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
3461                                               OSCC_FLAG_NOSPC_BLK);
3462                         cfs_spin_unlock(&oscc->oscc_lock);
3463                 }
3464                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3465                 break;
3466         }
3467         case IMP_EVENT_OCD: {
3468                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3469
3470                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3471                         osc_init_grant(&obd->u.cli, ocd);
3472
3473                 /* See bug 7198 */
3474                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3475                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3476
3477                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3478                 break;
3479         }
3480         case IMP_EVENT_DEACTIVATE: {
3481                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3482                 break;
3483         }
3484         case IMP_EVENT_ACTIVATE: {
3485                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3486                 break;
3487         }
3488         default:
3489                 CERROR("Unknown import event %d\n", event);
3490                 LBUG();
3491         }
3492         RETURN(rc);
3493 }
3494
3495 /**
3496  * Determine whether the lock can be canceled before replaying the lock
3497  * during recovery, see bug16774 for detailed information.
3498  *
3499  * \retval zero the lock can't be canceled
3500  * \retval other ok to cancel
3501  */
3502 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3503 {
3504         check_res_locked(lock->l_resource);
3505
3506         /*
3507          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3508          *
3509          * XXX as a future improvement, we can also cancel unused write lock
3510          * if it doesn't have dirty data and active mmaps.
3511          */
3512         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3513             (lock->l_granted_mode == LCK_PR ||
3514              lock->l_granted_mode == LCK_CR) &&
3515             (osc_dlm_lock_pageref(lock) == 0))
3516                 RETURN(1);
3517
3518         RETURN(0);
3519 }
3520
3521 static int brw_queue_work(const struct lu_env *env, void *data)
3522 {
3523         struct client_obd *cli = data;
3524
3525         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3526
3527         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3528         RETURN(0);
3529 }
3530
3531 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3532 {
3533         struct client_obd *cli = &obd->u.cli;
3534         int rc;
3535         ENTRY;
3536
3537         ENTRY;
3538         rc = ptlrpcd_addref();
3539         if (rc)
3540                 RETURN(rc);
3541
3542         rc = client_obd_setup(obd, lcfg);
3543         if (rc == 0) {
3544                 void *handler;
3545                 handler = ptlrpcd_alloc_work(cli->cl_import,
3546                                              brw_queue_work, cli);
3547                 if (!IS_ERR(handler))
3548                         cli->cl_writeback_work = handler;
3549                 else
3550                         rc = PTR_ERR(handler);
3551         }
3552
3553         if (rc == 0) {
3554                 struct lprocfs_static_vars lvars = { 0 };
3555
3556                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3557                 lprocfs_osc_init_vars(&lvars);
3558                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3559                         lproc_osc_attach_seqstat(obd);
3560                         sptlrpc_lprocfs_cliobd_attach(obd);
3561                         ptlrpc_lprocfs_register_obd(obd);
3562                 }
3563
3564                 oscc_init(obd);
3565                 /* We need to allocate a few requests more, because
3566                    brw_interpret tries to create new requests before freeing
3567                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3568                    reserved, but I afraid that might be too much wasted RAM
3569                    in fact, so 2 is just my guess and still should work. */
3570                 cli->cl_import->imp_rq_pool =
3571                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3572                                             OST_MAXREQSIZE,
3573                                             ptlrpc_add_rqs_to_pool);
3574
3575                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3576
3577                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3578         }
3579
3580         if (rc)
3581                 ptlrpcd_decref();
3582         RETURN(rc);
3583 }
3584
3585 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3586 {
3587         int rc = 0;
3588         ENTRY;
3589
3590         switch (stage) {
3591         case OBD_CLEANUP_EARLY: {
3592                 struct obd_import *imp;
3593                 imp = obd->u.cli.cl_import;
3594                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3595                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3596                 ptlrpc_deactivate_import(imp);
3597                 cfs_spin_lock(&imp->imp_lock);
3598                 imp->imp_pingable = 0;
3599                 cfs_spin_unlock(&imp->imp_lock);
3600                 break;
3601         }
3602         case OBD_CLEANUP_EXPORTS: {
3603                 struct client_obd *cli = &obd->u.cli;
3604                 /* LU-464
3605                  * for echo client, export may be on zombie list, wait for
3606                  * zombie thread to cull it, because cli.cl_import will be
3607                  * cleared in client_disconnect_export():
3608                  *   class_export_destroy() -> obd_cleanup() ->
3609                  *   echo_device_free() -> echo_client_cleanup() ->
3610                  *   obd_disconnect() -> osc_disconnect() ->
3611                  *   client_disconnect_export()
3612                  */
3613                 obd_zombie_barrier();
3614                 if (cli->cl_writeback_work) {
3615                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3616                         cli->cl_writeback_work = NULL;
3617                 }
3618                 obd_cleanup_client_import(obd);
3619                 ptlrpc_lprocfs_unregister_obd(obd);
3620                 lprocfs_obd_cleanup(obd);
3621                 rc = obd_llog_finish(obd, 0);
3622                 if (rc != 0)
3623                         CERROR("failed to cleanup llogging subsystems\n");
3624                 break;
3625                 }
3626         }
3627         RETURN(rc);
3628 }
3629
3630 int osc_cleanup(struct obd_device *obd)
3631 {
3632         int rc;
3633
3634         ENTRY;
3635
3636         /* free memory of osc quota cache */
3637         osc_quota_cleanup(obd);
3638
3639         rc = client_obd_cleanup(obd);
3640
3641         ptlrpcd_decref();
3642         RETURN(rc);
3643 }
3644
3645 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3646 {
3647         struct lprocfs_static_vars lvars = { 0 };
3648         int rc = 0;
3649
3650         lprocfs_osc_init_vars(&lvars);
3651
3652         switch (lcfg->lcfg_command) {
3653         default:
3654                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3655                                               lcfg, obd);
3656                 if (rc > 0)
3657                         rc = 0;
3658                 break;
3659         }
3660
3661         return(rc);
3662 }
3663
3664 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3665 {
3666         return osc_process_config_base(obd, buf);
3667 }
3668
3669 struct obd_ops osc_obd_ops = {
3670         .o_owner                = THIS_MODULE,
3671         .o_setup                = osc_setup,
3672         .o_precleanup           = osc_precleanup,
3673         .o_cleanup              = osc_cleanup,
3674         .o_add_conn             = client_import_add_conn,
3675         .o_del_conn             = client_import_del_conn,
3676         .o_connect              = client_connect_import,
3677         .o_reconnect            = osc_reconnect,
3678         .o_disconnect           = osc_disconnect,
3679         .o_statfs               = osc_statfs,
3680         .o_statfs_async         = osc_statfs_async,
3681         .o_packmd               = osc_packmd,
3682         .o_unpackmd             = osc_unpackmd,
3683         .o_precreate            = osc_precreate,
3684         .o_create               = osc_create,
3685         .o_create_async         = osc_create_async,
3686         .o_destroy              = osc_destroy,
3687         .o_getattr              = osc_getattr,
3688         .o_getattr_async        = osc_getattr_async,
3689         .o_setattr              = osc_setattr,
3690         .o_setattr_async        = osc_setattr_async,
3691         .o_brw                  = osc_brw,
3692         .o_punch                = osc_punch,
3693         .o_sync                 = osc_sync,
3694         .o_enqueue              = osc_enqueue,
3695         .o_change_cbdata        = osc_change_cbdata,
3696         .o_find_cbdata          = osc_find_cbdata,
3697         .o_cancel               = osc_cancel,
3698         .o_cancel_unused        = osc_cancel_unused,
3699         .o_iocontrol            = osc_iocontrol,
3700         .o_get_info             = osc_get_info,
3701         .o_set_info_async       = osc_set_info_async,
3702         .o_import_event         = osc_import_event,
3703         .o_llog_init            = osc_llog_init,
3704         .o_llog_finish          = osc_llog_finish,
3705         .o_process_config       = osc_process_config,
3706         .o_quotactl             = osc_quotactl,
3707         .o_quotacheck           = osc_quotacheck,
3708         .o_quota_adjust_qunit   = osc_quota_adjust_qunit,
3709 };
3710
3711 extern struct lu_kmem_descr osc_caches[];
3712 extern cfs_spinlock_t       osc_ast_guard;
3713 extern cfs_lock_class_key_t osc_ast_guard_class;
3714
3715 int __init osc_init(void)
3716 {
3717         struct lprocfs_static_vars lvars = { 0 };
3718         int rc;
3719         ENTRY;
3720
3721         /* print an address of _any_ initialized kernel symbol from this
3722          * module, to allow debugging with gdb that doesn't support data
3723          * symbols from modules.*/
3724         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3725
3726         rc = lu_kmem_init(osc_caches);
3727
3728         lprocfs_osc_init_vars(&lvars);
3729
3730         osc_quota_init();
3731         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3732                                  LUSTRE_OSC_NAME, &osc_device_type);
3733         if (rc) {
3734                 lu_kmem_fini(osc_caches);
3735                 RETURN(rc);
3736         }
3737
3738         cfs_spin_lock_init(&osc_ast_guard);
3739         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3740
3741         osc_mds_ost_orig_logops = llog_lvfs_ops;
3742         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3743         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3744         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3745         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3746
3747         RETURN(rc);
3748 }
3749
3750 #ifdef __KERNEL__
3751 static void /*__exit*/ osc_exit(void)
3752 {
3753         osc_quota_exit();
3754         class_unregister_type(LUSTRE_OSC_NAME);
3755         lu_kmem_fini(osc_caches);
3756 }
3757
3758 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3759 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3760 MODULE_LICENSE("GPL");
3761
3762 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3763 #endif