- struct obd_device *obddev = (struct obd_device *) arg;
- struct ost_obd *ost = &obddev->u.ost;
- ENTRY;
-
- lock_kernel();
- daemonize();
- spin_lock_irq(¤t->sigmask_lock);
- sigfillset(¤t->blocked);
- recalc_sigpending(current);
- spin_unlock_irq(¤t->sigmask_lock);
-
- sprintf(current->comm, "lustre_ost");
-
- /* Record that the thread is running */
- ost->ost_thread = current;
- wake_up(&ost->ost_done_waitq);
-
- /* XXX maintain a list of all managed devices: insert here */
-
- /* And now, wait forever for commit wakeup events. */
- while (1) {
- int rc;
-
- if (ost->ost_flags & OST_EXIT)
- break;
-
- wake_up(&ost->ost_done_waitq);
- interruptible_sleep_on(&ost->ost_waitq);
-
- CDEBUG(D_INODE, "lustre_ost wakes\n");
- CDEBUG(D_INODE, "pick up req here and continue\n");
-
-
- if (ost->ost_service != NULL) {
- ptl_event_t ev;
-
- while (1) {
- struct ptlrpc_request request;
- struct ptlrpc_service *service;
-
- rc = PtlEQGet(ost->ost_service->srv_eq_h, &ev);
- if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
- break;
-
- service = (struct ptlrpc_service *)ev.mem_desc.user_ptr;
-
- /* FIXME: If we move to an event-driven model,
- * we should put the request on the stack of
- * mds_handle instead. */
- memset(&request, 0, sizeof(request));
- request.rq_reqbuf = ev.mem_desc.start +
- ev.offset;
- request.rq_reqlen = ev.mem_desc.length;
- request.rq_ost = ost;
- request.rq_xid = ev.match_bits;
-
- request.rq_peer.peer_nid = ev.initiator.nid;
- /* FIXME: this NI should be the incoming NI.
- * We don't know how to find that from here. */
- request.rq_peer.peer_ni =
- ost->ost_service->srv_self.peer_ni;
- rc = ost_handle(obddev, &request);
-
- /* Inform the rpc layer the event has been handled */
- ptl_received_rpc(service);
- }
- } else {
- struct ptlrpc_request *request;
-
- if (list_empty(&ost->ost_reqs)) {
- CDEBUG(D_INODE, "woke because of timer\n");
- } else {
- request = list_entry(ost->ost_reqs.next,
- struct ptlrpc_request,
- rq_list);
- list_del(&request->rq_list);
- rc = ost_handle(obddev, request);
- }
- }
- }
-
- /* XXX maintain a list of all managed devices: cleanup here */
-
- ost->ost_thread = NULL;
- wake_up(&ost->ost_done_waitq);
- printk("lustre_ost: exiting\n");
- return 0;
+ struct ptlrpc_bulk_desc *desc;
+ struct obd_conn conn;
+ struct niobuf_remote *remote_nb;
+ struct niobuf_local *local_nb, *lnb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
+ void *tmp1, *tmp2, *end2;
+ ENTRY;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+ end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
+ objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
+ niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
+ cmd = body->data;
+
+ conn.oc_id = body->connid;
+ conn.oc_dev = req->rq_obd->u.ost.ost_tgt;
+
+ for (i = 0; i < objcount; i++) {
+ ost_unpack_ioo((void *)&tmp1, &ioo);
+ if (tmp2 + ioo->ioo_bufcnt > end2) {
+ rc = -EFAULT;
+ break;
+ }
+ for (j = 0; j < ioo->ioo_bufcnt; j++)
+ ost_unpack_niobuf((void *)&tmp2, &remote_nb);
+ }
+
+ size[1] = niocount * sizeof(*remote_nb);
+ rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
+ if (rc)
+ GOTO(fail, rc);
+ remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
+
+ OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
+ if (local_nb == NULL)
+ GOTO(fail, rc = -ENOMEM);
+
+ /* The unpackers move tmp1 and tmp2, so reset them before using */
+ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
+ tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
+ req->rq_status = obd_preprw(cmd, &conn, objcount,
+ tmp1, niocount, tmp2, local_nb);
+ if (req->rq_status)
+ GOTO(success, 0);
+
+ desc = ptlrpc_prep_bulk(req->rq_connection);
+ if (desc == NULL)
+ GOTO(fail_preprw, rc = -ENOMEM);
+ desc->b_cb = ost_brw_write_finished_cb;
+ desc->b_portal = OSC_BULK_PORTAL;
+ memcpy(&(desc->b_conn), &conn, sizeof(conn));
+
+ /* Save journal context for commit callbacks */
+ CDEBUG(D_BUFFS, "journal_info: saved %p->%p\n", current,
+ current->journal_info);
+ desc->b_journal_info = current->journal_info;
+
+ for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
+ struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+ struct ptlrpc_bulk_page *bulk;
+
+ bulk = ptlrpc_prep_bulk_page(desc);
+ if (bulk == NULL)
+ GOTO(fail_bulk, rc = -ENOMEM);
+
+ spin_lock(&srv->srv_lock);
+ bulk->b_xid = srv->srv_xid++;
+ spin_unlock(&srv->srv_lock);
+
+ bulk->b_buf = (void *)(unsigned long)lnb->addr;
+ bulk->b_page = lnb->page;
+ bulk->b_buflen = PAGE_SIZE;
+ bulk->b_cb = ost_brw_write_cb;
+
+ /* this advances remote_nb */
+ ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
+ bulk->b_xid);
+ }
+
+ rc = ptlrpc_register_bulk(desc);
+ current->journal_info = NULL; /* kind of scary */
+ if (rc)
+ GOTO(fail_bulk, rc);
+
+ EXIT;
+ success:
+ OBD_FREE(local_nb, niocount * sizeof(*local_nb));
+ return 0;
+
+ fail_bulk:
+ ptlrpc_free_bulk(desc);
+ fail_preprw:
+ OBD_FREE(local_nb, niocount * sizeof(*local_nb));
+ /* FIXME: how do we undo the preprw? */
+ fail:
+ return rc;