Changelog:

The originally included version of test_util.py used by make failure_test sometimes checks the content returned from worker’s GetCommitted() methods when they worker indicates the value is unavailable, which it should not have done. You can get an updated version of test_util.py that does not do that here.

If you downloaded the skeleton code before 21 November 2019 7PM, then the skeleton .proto file either did not include the SetValue worker method used by our example base code (and might have included additional “Student added methods” that you don’t need to use). Either add the SetValue method or get an updated version of the skeleton code.

Your Task

  1. Download our template code (last updated very late 30 November 2019) that implements a remote procedure call-based service that stores a single string redundantly on multiple “workers”, using a single “coordinator” to update the common value.

    Each worker and coordinator can be run as separate program, but our tests run them all in one process. We use the gRPC remote procedure call (RPC) library. Although our RPC library intended to operate remotely over IP (internet protocol)-based sockets (represented by an IP address and port number), our tests use “Unix domain” sockets, which are represented by a file.

    The intention is that the value is only updated through the coordinator but can always be retrieved from any one of the workers, even if the others are inaccessible or down. But, we give priority to consistency over availability. This means that rather ever allowing one worker to report that the value is currently A and another report that it is currently B, we would rather that some workers instead indicate that the value is unavailable.

    The Coordinator’s RPC interface has one method you must implement called SetValue() that updates stored the value on all workers (or returns an error if it fails), and the Worker’s RPC interface has one method you must implement called GetCommitted() that returns the current value if it’s available. (Initially, the value is empty/None.) We describe these in more detail below.

  2. To simplify the assignment, all persistent data will be stored in a very simple persistent log. In a “real” database, most likely there would be separate updates to the log and to the actually stored data on disk on the workers. In your case, you will update a copy of the stored value in memory and write log entries that contain information about updates being performed (and the state of the worker/coordinators). Whenever a worker starts up, it will read its currently stored value from the last log entry into memory. While it works, it will update its value in memory and also update the log.

    (If you don’t want to keep the stored value in memory yourself, you may also choose to always reread the log to figure it out, since our tests won’t be able to tell the difference.)

    To further simplify this log, the log will only keep one entry. When you write a new log entry in this system, the prior log entry is discarded. This means that rather than scanning through all the log entries, you will need to rewrite any information you wanted to keep around in each new log entry. Most likely, this means that each worker log entry will contain a copy of the current stored value, even if it is not changing.

  3. Build the template code using the instructions below and the supplied Makefile and run the tests with make no_fail_test and make failure_test (see the supplied tests section below for more detail).

    (Before running these tests, if you downloladed the template code on/before 30 November, updated your test_util.py to this version

    I also recommend experimenting with running the code manually.

  4. Our template code provides a naive implementation which does not provide consistency. This manifests in two ways:

    • while the value is being changed from A to B, workers will not be consistent about what the value is. So, if there are two workers, it’s possible to observe value A on worker 1, then value B on worker 1, then value A on worker 2, then value B on worker 2. This is caused because the naive coordinator changes the value on worker 1 before changing the value on worker 2. This gives the observer the erroneous impression that, in addition to changing from A to B, the value changed from B to A and from A to B a second time.

    • if a failure occurs, one or more workers may disagree on the value indefinitely. For example, if there are two workers, but worker 2 is temporarily inaccessible, changing the value from A to B will return an error. But, after this error is returned worker 1 will indicate that the value is B, but worker 2 will indicate that the value is A.

    Your job will be to modify the coordinator and worker code to fix these inconsistencies using two-phase commit.

    Your implementation must:

    1. Only use RPC calls to communicate between the coordinator and workers. We rely on injecting failures in these RPC calls to test your implementation. Our tests happen to run the workers and coordinator in a single process on a single machine, because this makes the tests much easier to write. But your code must work if we run them in seperate processes on separate machines.

    2. Use the supplied PersistentLog implementation (described below ) to store any data that must be saved in the event that a worker or coordinator fails or is shut down.

      To inject failures, our tests use a PersistentLog subclass that sometimes throws an exception. You may not catch this exception.

    3. Only run added code in response to RPC calls or when a coordinator or worker is constructed. This is unlike how a typical two-phase commit system would work. There, in the event of a failure, the system would typically retry an apparently failed operation after a timeout. To make testing easier, you should not use timeouts to trigger these “recovery” operations. Instead:

      • if your coordinator has problems contacting a worker in its SetValue() operation, it should cause the SetValue() operation to fail immediately (e.g. by throwing an exception) rather than retrying to contact the worker.

        After this occurs, it is okay if some workers indicate the current value is unavailable. (However, all workers that return a stored value must return the same value. This could either by the original value (from before SetValue() was called) or the new value (supplied to SetValue()) depending on your implementation and when the communication failure occured)

      • whenever your coordinator is created, before returning from its constructor, it should communicate with all the workers to make the currently stored value available from all of them. If communicating with a worker fails during this, your constructor must throw an exception rather than retrying the commuication. (Our testing code will try to create your coordinator again.)

    4. Not attempt to have workers communciate directly with each other (that is, not via remote procedure calls). In some two-phase commit systems, workers coordinate directly so they can figure out whether transactions should commit or abort even if a coordinator fails. This not required, and since our tests are not built to facilitate this, you must not do this.

  5. If you downloaded the skeleton code before late 27 November 2019, add the line SUBMIT_FILENAME=twophase-$(shell date +%Y%m%d%H%M%S).tar.gz to your Makefile to make the make submit target work (or replace it with this updated Makefile).

    Then run make submit to create a .tar.gz and upload it for submission.

    Alternately, make a .tar.gz manually.

Building

We recommend using python’s “virtual environments” feature to install the dependencies for this package locally rather than requiring them to be installed globally. In particular, this avoids problems with different programs requiring different versions of the libraries we use. We have supplied a script to assist with this:

Using manually

To use this system, you need to start one or more worker servers, then start a coordinator server, with the address of each of the worker servers. The coordinator will act as both a server — to receive commands to set values — and a client to each of the workers. The workers will act as a server for both the coordinator (for the commands to set values and, when you implement it, that are part of the two phase commit protocol) and for programs that query the current value.

Specifying addresses

The RPC system we use, gRPC, supports two types of server addresses. For testing we recommend primarily using the second type which only works locally:

Starting servers

To start a worker, you can use command like

      python ./worker.py unix:first-worker-socket first-worker-log &

    

The first argument specifies where the worker’s RPC server will listen for calls; in this example, it will use a socket file called first-worker-socket in the current directory.

The second argument specifies the log file where the worker’s log will be stored. It will be created if it does not exist. Just after it is created, that the corresponding PersistentLog object paseed to the worker will return None from get_last_log().

& says to run the command in the background; you can omit, but then you won’t be able to easily run other commands in the same terminal while the worker is running.

To start a coordinator, you can use a command like:

      python ./coordinator.py unix:coordinator-socket coordinator-log unix:first-worker-socket unix:second-worker-socket

    

The first arugment specifies where the coordinator’s RPC server will listen for calls; in this case, a socket file called coordinator-socket.

The second argument specifies the log file where the coordinator’s log will be stored. It will be created if it does not exist. Just after it is created, that the corresponding PersistentLog object paseed to the worker will return None from get_last_log().

The remaining arguments specify how to connect to the workers; the number of arguments supplied must correspond to the number of workers you want to use (which can be as few as 1). Code we supply will create stub objects for each of these workers before starting the coordinator.

Sending commands to servers

After starting servers, we supply two utilty programs for sending commands:

set_value.py

Running a command like:

      python ./set_value.py unix:coordinator-socket SomeValue

    

will call the SetValue() method on the coordinator server specified by the first argument with a content string of SomeValue. If the coordinator returns an error, it will crash with a message about an exception being thrown.

If you get an error from the coordinator containing text like Exception calling application: 'WorkerStub' object has no attribute 'SetValue' then this indicates that you have an old version of twophase.proto which was missing the declaration of the SetValue RPC method included in our base code. (See note at top of this writeup.)

get_value.py

Running a command like:

      python ./get_value.py unix:first-worker-socket

    

will call the GetCommitted() method on the worker specified by the first arugment and display the result.

If the worker’s reutrn value indicates the value is unavailable, it wil print value is UNAVAILABLE. If it indicates the value is available is equal to the string SomeValue, it wil print value is AVAIALBLE and SomeValue`. If the worker returns an error from the call, it will crash with a message about an exception being thrown.

Supplied Tests

We have supplied several tests based on Python’s built-in unittest library. We supply makefile targets that run each of the tests, or you can run Python directly using similar commands. As run in the Makefile, the tests will stop at the first failure, and if they print no messages about failures, all the tests passed.

Each of these tests runs the coordinator and one or more workers using Unix-domain sockets located in a temporary directory. These tests supply a PersistentLog object to the coordinator and workers which is stored in memory rather than in a file on disk.

To test failures, the tests run servers that acts as a proxy between the coordinator and workers. This server takes a remote procedure call intended for a worker and does one of the following:

The PersistentLog object we provide also supports injecting failures by throwing an exception.

no_fail_tests

This file contains tests where there are no injected failures (but the worker and coordinator are restarted to ensure that the persistent log is in use).

The primary thing these tests try to check is that when the stored value is being changed from A to B,

To do this, these tests that repeatedly checks whether all workers agree on the current value while the value is being changed. To ensure that the value is consistent at all times, we intercept messages between the coordinator and workers. Before and after the coordinator makes an RPC call to any worker, we ask the worker what their current values are to make sure they are consistent.

Since we do this check in response to each message you send, when this check fails, it may appear as if it’s part of sending that message failing.

In the naive implementation, when the value is changing from A to B, some workers start reporting the new value B while other workers are still reporting the new value A. You must fix the implementation that when a worker starts reporting the new value B, all other workers either report that the same value or that the value is unavailable.

failure_test

There is an updated version of test_util.py released on 30 November 2019 that fixes a bug where these tests were excessively sensitive. You can download that here.

This tests a variety of circumstances involving injected failures. Most tests are parameterized to vary things like the number of workers and when the injected failure occurs. (Our intention is that these tests should find many bugs, but they are definitely not exhaustive. Most notably the tests only try so many scenarios and if you use more or fewer messages, you might need to inject failures differently.)

Files in the distribution

Hints

Using gRPC

  1. To create or modify a method in an RPC service, you need to

    • add or modify the method to the twophase.proto file for the service
    • add or modify the method to the corresponding class in coordinator.py and worker.py. It should always take exactly three arguments:
      • self (the service object),
      • request (the argument to the method, which is a message declared in twophase.proto), and
      • context, which provides access to utility functions for the RPC system, such as to send back errors

    and then run make to regenerate twophase_pb2.py and twophase_pb2_grpc.py based on twophase.proto.

  2. To return an error from an RPC method, you can use code like

    context.abort(grpc.StatusCode.INTERNAL, 'message')
    

    where grpc.StatusCode.INTERNAL is a status code taken from the list here, and 'message' is a message of your choice. In the client calling the RPC service, this error will turn into an Python exception.

  3. When an RPC method fails, grpc throws an exception that inherits from grpc.RpcError. You could catch this exception to handle it explicitly, but in my reference implementaiton, I do not do this. (I just rely on the exception “crashing” my coordinator, and assume that the coordinator will be restarted to recover from this.)

Understanding gRPC errors

  1. If an exception occurs during a method in a service, then gRPC will catch the exception and return an error from the method, with information about the exception embedded in the error. When the client receives this error, this will result in another exception. Since sometimes our tests will call RPC methods which call other methods, this can lead to rather long errors. For example, in my reference implementation if I add raise Exception("THIS IS THE ACTUAL PROBLEM") to a worker method called by the coordinator, then I get an stream of error messages like like:

    ERROR:grpc._server:Exception calling application: THIS IS THE ACTUAL PROBLEM
    Traceback (most recent call last):
      File "/net/zf14/cr4bd/fall2019/cs4414/hw/2p-py/lib/python3.6/site-packages/grpc/_server.py", line 434, in _call_behavior
        response_or_iterator = behavior(argument, context)
      File "/u/cr4bd/fall2019/cs4414/hw/2p-py/worker.py", line 53, in Commit
        raise Exception("THIS IS THE ACTUAL PROBLEM")
    Exception: THIS IS THE ACTUAL PROBLEM
    

    This is part of the error message is written from the worker just before the RPC library sends back the error to the caller.

    ERROR:grpc._server:Exception calling application: <_Rendezvous of RPC that terminated with:
            status = StatusCode.UNKNOWN
            details = "Exception calling application: THIS IS THE ACTUAL PROBLEM"
            debug_error_string = "{"created":"@1572716887.779957640","description":"Error received from peer unix:/tmp/2ppytphwv79d/worker-0","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Exception calling application: THIS IS THE ACTUAL PROBLEM","grpc_status":2}"
    >
    Traceback (most recent call last):
    

    … (omitted several lines) …

    When the error is received by the place where it was called from the coordinator, it triggers an exception. This is a printout of that exception. Note that in the debug_error_string the original exception mechanism is included along with information about the RPC call that was made and the part of the RPC library which handled it.

    ERROR:grpc._server:Exception calling application: <_Rendezvous of RPC that terminated with:
            status = StatusCode.UNKNOWN
            details = "Exception calling application: <_Rendezvous of RPC that terminated with:
            status = StatusCode.UNKNOWN
            details = "Exception calling application: THIS IS THE ACTUAL PROBLEM"
            debug_error_string = "{"created":"@1572716887.779957640","description":"Error received from peer unix:/tmp/2ppytphwv79d/worker-0","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Exception calling application: THIS IS THE ACTUAL PROBLEM","grpc_status":2}"
    >"
    

    (… omitted more lines …) grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = “Exception calling application: <_Rendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = “Exception calling application: THIS IS THE ACTUAL PROBLEM” debug_error_string = “{“created”:”@1572716887.779957640”,”description”:”Error received from peer unix:/tmp/2ppytphwv79d/worker-0”,”file”:”src/core/lib/surface/call.cc”,”file_line”:1052,”grpc_message”:”Exception calling application: THIS IS THE ACTUAL PROBLEM”,”grpc_status”:2}” >” debug_error_string = “{“created”:”@1572716887.781029247”,”description”:”Error received from peer unix:/tmp/2ppytphwv79d/worker-forward-0”,”file”:”src/core/lib/surface/call.cc”,”file_line”:1052,”grpc_message”:”Exception calling application: <_Rendezvous of RPC that terminated with:\n\tstatus = StatusCode.UNKNOWN\n\tdetails = “Exception calling application: THIS IS THE ACTUAL PROBLEM”\n\tdebug_error_string = “{“created”:”@1572716887.779957640”,”description”:”Error received from peer unix:/tmp/2ppytphwv79d/worker-0”,”file”:”src/core/lib/surface/call.cc”,”file_line”:1052,”grpc_message”:”Exception calling application: THIS IS THE ACTUAL PROBLEM”,”grpc_status”:2}”\n>”,”grpc_status”:2}” >

    Since I did not catch the exception in the coordinator, it also triggered an exception in the the program that called the coordinator. As a result we got a third and fourth, even longer message.

    Note that you probably want to look to make sure you look for the first error or at least one of the early ones to diagnose what is going on.

    Note that in some of our tests, we deliberately inject communication errors from gRPC. Error messages triggered by these injected errors are likely normal, so you should not conclude that seeing error messages based on this is abnormal without further investigation.

The persistent log

  1. You should use the persistent log to save the state of the coordinator and each worker. This includes both the value they are storing and what stage they are in the middle of the transaction.

    On the workers, in order to return the correct value from GetCommitted, you will need to store the actual string stored. On the coordinator, in order to get a worker which failed to receive a transaction starting message, you will likely need to store the actual string as well information about any transaction is in progress.

    Updates to the peristent log are atomic. After updating the log, you will either see the previous or current version of the string.

Adding messages

  1. You can add new messages types to twophase.proto using similar syntax to the MaybeValue declared in our skeleton code:

    message MaybeValue {
        bool available = 1;
        string content = 2;
    }
    

    Each field in the message needs to be assigned a unique number (and this number identifies that field when messages are sent over the network, not its name). Each field can have types like bool, string, int32, float, etc. You can see the full list of types in the protocol buffer documentation along with more complete reference on the supported syntax for messages.

  2. You can add or replace the methods of the worker in twophase.proto that the coordinator uses to communicate with the worker. Our skeleton code uses a SetValue() method that takes the new value as an argument, but this provides the worker too little information to implement its part of the distributed transaction.

Transaction IDs and Sequence Numbers

  1. It is possible for the coordinator to try to send that message to the worker and for the message to appear not be sent. When this happens, it is possible that the message does not actually reach the worker, or the message does reach the worker immediately, or the message reaches the worker sometime later. (Our tests create the situation deliberately. In a real network, it could likely happen as a result of some component (perhaps between the coordinator and worker machine) trying to resend a message after a failure.