Changelog:

Your Task

  1. Download our template code (last updated 30 April 2022) that implements a remote procedure call-based service that stores a single string redundantly on multiple “workers”, using a single “coordinator” to update the common value.

    Each worker and coordinator can be run as separate program, but our tests run them all in one process. We use the gRPC remote procedure call (RPC) library. Although our RPC library intended to operate remotely over IP (internet protocol)-based sockets (represented by an IP address and port number), our tests use “Unix domain” sockets, which are represented by a file.

    The intention is that the value is only updated through the coordinator but can always be retrieved from any one of the workers, even if the others are inaccessible or down. But, we give priority to consistency over availability. This means that rather ever allowing one worker to report that the value is currently A and another report that it is currently B, we would rather that some workers instead indicate that the value is unavailable.

    The Coordinator’s RPC interface has one method you must implement called SetValue() that updates stored the value on all workers (or returns an error if it fails), and the Worker’s RPC interface has one method you must implement called GetCommitted() that returns the current value if it’s available. (Initially, the value is empty/None.) We describe these in more detail below.

  2. To simplify the assignment, all persistent data will be stored in a very simple persistent log. In a “real” database, most likely there would be separate updates to the log and to the actually stored data on disk on the workers. In your case, you will update a copy of the stored value in memory and write log entries that contain information about updates being performed (and the state of the worker/coordinators). Whenever a worker starts up, it will read its currently stored value from the last log entry into memory. While it works, it will update its value in memory and also update the log.

    (If you don’t want to keep the stored value in memory yourself, you may also choose to always reread the log to figure it out, since our tests won’t be able to tell the difference.)

    To further simplify this log, the log will only keep one entry. When you write a new log entry in this system, the prior log entry is discarded. This means that rather than scanning through all the log entries, you will need to rewrite any information you wanted to keep around in each new log entry. Most likely, this means that each worker log entry will contain a copy of the current stored value, even if it is not changing.

  3. Build the template code using the instructions below and the supplied Makefile and run the tests with make no_fail_test and make failure_test (see the supplied tests section below for more detail).

    I also recommend experimenting with running the code manually.

  4. Our template code provides a naive implementation which does not provide consistency. This manifests in two ways:

    • while the value is being changed from A to B, workers will not be consistent about what the value is. So, if there are two workers, it’s possible to observe value A on worker 1, then value B on worker 1, then value A on worker 2, then value B on worker 2. This is caused because the naive coordinator changes the value on worker 1 before changing the value on worker 2. This gives the observer the erroneous impression that, in addition to changing from A to B, the value changed from B to A and from A to B a second time.

    • if a failure occurs, one or more workers may disagree on the value indefinitely. For example, if there are two workers, but worker 2 is temporarily inaccessible, changing the value from A to B will return an error. But, after this error is returned worker 1 will indicate that the value is B, but worker 2 will indicate that the value is A.

    Your job will be to modify the coordinator and worker code to fix these inconsistencies using two-phase commit.

    Your implementation must:

    1. Only use RPC calls initiatied by the coordinator to communicate between the coordinator and workers. We rely on injecting failures in these RPC calls to test your implementation. Our tests happen to run the workers and coordinator in a single process on a single machine, because this makes the tests much easier to write. But your code must work if we run them in seperate processes on separate machines.

    2. Use the supplied PersistentLog implementation (described below) to store any data that must be saved in the event that a worker or coordinator fails or is shut down.

      To inject failures, our tests use a PersistentLog subclass that sometimes throws an exception. You may not catch this exception.

    3. Only run added code in response to RPC calls or when a coordinator or worker is constructed, and (therefore) trigger recovery from failures by having the testing code restart your coordinator and/or worker.

      This is unlike how a typical two-phase commit system would work. There, in the event of a failure, the system would typically retry an apparently failed operation after a timeout. To make testing easier, you should not use timeouts to trigger these “recovery” operations. Instead:

      • if your coordinator has problems contacting a worker in its SetValue() operation, it should cause the SetValue() operation to fail immediately (e.g. by throwing an exception) rather than retrying to contact the worker.

        After this occurs, it is okay if some workers indicate the current value is unavailable. (However, all workers that return a stored value must return the same value. This could either by the original value (from before SetValue() was called) or the new value (supplied to SetValue()) depending on your implementation and when the communication failure occured)

      • whenever your coordinator is created, before returning from its constructor, it should communicate with all the workers to make the currently stored value available from all of them. If communicating with a worker fails during this, your constructor must throw an exception rather than retrying the commuication. (Our testing code will try to create your coordinator again.)

    4. Not attempt to have workers communciate directly with each other (that is, not via remote procedure calls). In some two-phase commit systems, workers coordinate directly so they can figure out whether transactions should commit or abort even if a coordinator fails. This not required, and since our tests are not built to facilitate this, you must not do this.

  5. Run make submit to create a .tar.gz and upload it for submission.

    Alternately, make a .tar.gz of your code manually and upload it.

Building

We recommend using python’s “virtual environments” feature to install the dependencies for this package locally rather than requiring them to be installed globally. In particular, this avoids problems with different programs requiring different versions of the libraries we use. We have supplied a script to assist with this:

Using manually

To use this system, you need to start one or more worker servers, then start a coordinator server, with the address of each of the worker servers. The coordinator will act as both a server — to receive commands to set values — and a client to each of the workers. The workers will act as a server for both the coordinator (for the commands to set values and, when you implement it, that are part of the two phase commit protocol) and for programs that query the current value.

Specifying addresses

The RPC system we use, gRPC, supports two types of server addresses. For testing we recommend primarily using the second type which only works locally:

Starting servers

To start a worker, you can use command like

      python ./worker.py unix:first-worker-socket first-worker-log &

    

The first argument specifies where the worker’s RPC server will listen for calls; in this example, it will use a socket file called first-worker-socket in the current directory.

The second argument specifies the log file where the worker’s log will be stored. It will be created if it does not exist. Just after it is created, that the corresponding PersistentLog object paseed to the worker will return None from get_last_log_entry().

& says to run the command in the background; you can omit, but then you won’t be able to easily run other commands in the same terminal while the worker is running.

To start a coordinator, you can use a command like:

      python ./coordinator.py unix:coordinator-socket coordinator-log unix:first-worker-socket unix:second-worker-socket

    

The first arugment specifies where the coordinator’s RPC server will listen for calls; in this case, a socket file called coordinator-socket.

The second argument specifies the log file where the coordinator’s log will be stored. It will be created if it does not exist. Just after it is created, that the corresponding PersistentLog object paseed to the worker will return None from get_last_log_entry().

The remaining arguments specify how to connect to the workers; the number of arguments supplied must correspond to the number of workers you want to use (which can be as few as 1). Code we supply will create stub objects for each of these workers before starting the coordinator.

Note on shutting down servers

If you’re running a server in the background, it will continue running. If you want to stop it you can use

      kill PID

    

where PID is the process ID. When you start a server in the background, your shell may print out something like [1] 12345, in which case the process ID is 12345. Otherwise, you can use something like ps -ax -u $USER | grep python to find the process ID (process IDs are the first column of this command’s output).

Sending commands to servers

After starting servers, we supply two utilty programs for sending commands:

set_value.py

Running a command like:

      python ./set_value.py unix:coordinator-socket SomeValue

    

will call the SetValue() method on the coordinator server specified by the first argument with a content string of SomeValue. If the coordinator returns an error, it will crash with a message about an exception being thrown.

get_value.py

Running a command like:

      python ./get_value.py unix:first-worker-socket

    

will call the GetCommitted() method on the worker specified by the first arugment and display the result.

If the worker’s reutrn value indicates the value is unavailable, it wil print value is UNAVAILABLE. If it indicates the value is available is equal to the string SomeValue, it wil print value is AVAIALBLE and SomeValue`. If the worker returns an error from the call, it will crash with a message about an exception being thrown.

Supplied Tests

We have supplied several tests based on Python’s built-in unittest library. We supply makefile targets that run each of the tests, or you can run Python directly using similar commands. As run in the Makefile, the tests will stop at the first failure, and if they print no messages about failures, all the tests passed.

Each of these tests runs the coordinator and one or more workers using Unix-domain sockets located in a temporary directory. These tests supply a PersistentLog object to the coordinator and workers which is stored in memory rather than in a file on disk.

To test failures, the tests run servers that acts as a proxy between the coordinator and workers. This server takes a remote procedure call intended for a worker and does one of the following:

The PersistentLog object we provide also supports injecting failures by throwing an exception.

no_fail_tests

If this tests runs very slow, see if running pip install grpcio==1.44.0 then rerunning the test is faster.

This file contains tests where there are no injected failures (but the worker and coordinator are restarted to ensure that the persistent log is in use).

The primary thing these tests try to check is that when the stored value is being changed from A to B, the workers are consistent with each other. We consider values inconsistent if after any worker starts returning B, another worker returns A. To do this, these tests that repeatedly checks whether all workers agree on the current value while the value is being changed. To ensure that the value is consistent at all times, we intercept messages between the coordinator and workers. Before and after the coordinator makes an RPC call to any worker, we ask the worker what their current values are to make sure they are consistent.

Since we do this check in response to each message you send, when this check fails, it may appear as if it’s part of sending that message failing.

In the naive implementation, when the value is changing from A to B, some workers start reporting the new value B while other workers are still reporting the new value A. You must fix the implementation that when a worker starts reporting the new value B, all other workers either report that the same value or that the value is unavailable.

failure_test

If you experience an error about “too many open files” when running this test on our VM, try running on the department machines instead. Or alternately, try running the command

  ulimit -n 10240

to increase the limit on the number of open file descriptors.

Alternately, download this version of failure_test.py with the tests split into smaller groups and run each of the test groups separately:

  python failure_test_split.py SetOneValueNoFailuresTest
  python failure_test_split.py ChangeValueNoFailuresTest
  python failure_test_split.py ChangeValueReorderingTest
  python failure_test_split.py ChangeValueFailuresTest
  python failure_test_split.py ChangeValueLogFailuresTest

This tests a variety of circumstances involving injected failures. Most tests are parameterized to vary things like the number of workers and when the injected failure occurs. (Our intention is that these tests should find many bugs, but they are definitely not exhaustive. Most notably the tests only try so many scenarios and if you use more or fewer messages, you might need to inject failures differently.)

running individual tests

You can run a particular test manually using a command like

      python no_fail_test.py NoFailTest.test_change_value_once_0

    

In this command NoFailTest is the name of the class (“test suite”) the test is a part of, which you can tell from the class NoFailTest...: line that predates it in no_fail_test.py. One of the tests in no_fail_test.py is declared like:

      @parameterized.expand([
    (1,),
    (2,),
    (3,),
])  
def test_change_value_once(self, num_workers):
  ...

    

The @parameterized.expand operation creates three variants of this test named test_change_value_once_0, test_change_value_once_1 and test_change_value_once_2. So the command above selects the first variant where the num_workers argument is 1.

Files in the distribution

Hints

General approach

  1. I recommend first familiarizing yourself with how to run the coordinator and worker manually (without running one of the test files).

  2. You will need to modify how the coordinator communicates with the worker. In the skeleton code, the coordinator’s SetValue() method makes a single SetValue() call to each worker from its own method. You will need to change this to correspond to twophase-commit, by modifying the RPCs the coordinator can make to the worker in twophase.proto and the corresponding coordinator and worker code.

  3. You will probably want to modify the coordinator and worker to store additional information in their log to enable recovery.

  4. You should write your “recover from crash” code in the constructor (__init__) of the coordinator and/or worker, and this code should read from the log.

Using gRPC

  1. To create or modify a method in an RPC service, you need to

    • add or modify the method to the twophase.proto file for the service
    • add or modify the method to the corresponding class in coordinator.py and worker.py. It should always take exactly three arguments:
      • self (the service object),
      • request (the argument to the method, which is a message declared in twophase.proto), and
      • context, which provides access to utility functions for the RPC system, such as to send back errors

    and then run make to regenerate twophase_pb2.py and twophase_pb2_grpc.py based on twophase.proto.

  2. To return an error from an RPC method, you can use code like

    context.abort(grpc.StatusCode.INTERNAL, 'message')
    

    where grpc.StatusCode.INTERNAL is a status code taken from the list here, and 'message' is a message of your choice. In the client calling the RPC service, this error will turn into an Python exception.

  3. When an RPC method fails, grpc throws an exception that inherits from grpc.RpcError. You could catch this exception to handle it explicitly, but in my reference implementaiton, I do not do this. (I just rely on the exception “crashing” my coordinator, and assume that the coordinator will be restarted to recover from this.)

Understanding gRPC errors

If your error message contains something about “too many open files”, see the workaround described in the description of failure_test

  1. If an exception occurs during a method in a service, then gRPC will catch the exception and return an error from the method, with information about the exception embedded in the error. When the client receives this error, this will result in another exception. Since sometimes our tests will call RPC methods which call other methods, this can lead to rather long errors. For example, in my reference implementation if I add raise Exception("THIS IS THE ACTUAL PROBLEM") to a worker method called by the coordinator, then I get an stream of error messages like like:

    ERROR:grpc._server:Exception calling application: THIS IS THE ACTUAL PROBLEM
    Traceback (most recent call last):
      File "/net/zf14/cr4bd/fall2019/cs4414/hw/2p-py/lib/python3.6/site-packages/grpc/_server.py", line 434, in _call_behavior
        response_or_iterator = behavior(argument, context)
      File "/u/cr4bd/fall2019/cs4414/hw/2p-py/worker.py", line 53, in Commit
        raise Exception("THIS IS THE ACTUAL PROBLEM")
    Exception: THIS IS THE ACTUAL PROBLEM
    

    This is part of the error message is written from the worker just before the RPC library sends back the error to the caller.

    ERROR:grpc._server:Exception calling application: <_Rendezvous of RPC that terminated with:
            status = StatusCode.UNKNOWN
            details = "Exception calling application: THIS IS THE ACTUAL PROBLEM"
            debug_error_string = "{"created":"@1572716887.779957640","description":"Error received from peer unix:/tmp/2ppytphwv79d/worker-0","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Exception calling application: THIS IS THE ACTUAL PROBLEM","grpc_status":2}"
    >
    Traceback (most recent call last):
    

    … (omitted several lines) …

    When the error is received by the place where it was called from the coordinator, it triggers an exception. This is a printout of that exception. Note that in the debug_error_string the original exception mechanism is included along with information about the RPC call that was made and the part of the RPC library which handled it.

    ERROR:grpc._server:Exception calling application: <_Rendezvous of RPC that terminated with:
            status = StatusCode.UNKNOWN
            details = "Exception calling application: <_Rendezvous of RPC that terminated with:
            status = StatusCode.UNKNOWN
            details = "Exception calling application: THIS IS THE ACTUAL PROBLEM"
            debug_error_string = "{"created":"@1572716887.779957640","description":"Error received from peer unix:/tmp/2ppytphwv79d/worker-0","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Exception calling application: THIS IS THE ACTUAL PROBLEM","grpc_status":2}"
    >"
    

    (… omitted more lines …)

    grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
            status = StatusCode.UNKNOWN
            details = "Exception calling application: <_Rendezvous of RPC that terminated with:
            status = StatusCode.UNKNOWN
            details = "Exception calling application: THIS IS THE ACTUAL PROBLEM"
            debug_error_string = "{"created":"@1572716887.779957640","description":"Error received from peer unix:/tmp/2ppytphwv79d/worker-0","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Exception calling application: THIS IS THE ACTUAL PROBLEM","grpc_status":2}"
    >"
            debug_error_string = "{"created":"@1572716887.781029247","description":"Error received from peer unix:/tmp/2ppytphwv79d/worker-forward-0","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Exception calling application: <_Rendezvous of RPC that terminated with:\n\tstatus = StatusCode.UNKNOWN\n\tdetails = "Exception calling application: THIS IS THE ACTUAL PROBLEM"\n\tdebug_error_string = "{"created":"@1572716887.779957640","description":"Error received from peer unix:/tmp/2ppytphwv79d/worker-0","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Exception calling application: THIS IS THE ACTUAL PROBLEM","grpc_status":2}"\n>","grpc_status":2}"
    >
    

    Since I did not catch the exception in the coordinator, it also triggered an exception in the the program that called the coordinator. As a result we got a third and fourth, even longer message.

    Note that you probably want to look to make sure you look for the first error or at least one of the early ones to diagnose what is going on.

    Note that in some of our tests, we deliberately inject communication errors from gRPC. Error messages triggered by these injected errors are likely normal, so you should not conclude that seeing error messages based on this is abnormal without further investigation.

The persistent log

  1. You should use the persistent log to save the state of the coordinator and each worker. This includes both the value they are storing and what stage they are in the middle of the transaction.

    On the workers, in order to return the correct value from GetCommitted, you will need to store the actual string stored. On the coordinator, in order to get a worker which failed to receive a transaction starting message, you will likely need to store the actual string as well information about any transaction is in progress.

    Updates to the peristent log are atomic. After updating the log, you will either see the previous or current version of the string.

Adding messages

  1. You can add new messages types to twophase.proto using similar syntax to the MaybeValue declared in our skeleton code:

    message MaybeValue {
        bool available = 1;
        string content = 2;
    }
    

    Each field in the message needs to be assigned a unique number (and this number identifies that field when messages are sent over the network, not its name). Each field can have types like bool, string, int32, float, etc. You can see the full list of types in the protocol buffer documentation along with more complete reference on the supported syntax for messages.

  2. You can add or replace the methods of the worker in twophase.proto that the coordinator uses to communicate with the worker. Our skeleton code uses a SetValue() method that takes the new value as an argument, but this provides the worker too little information to implement its part of the distributed transaction.

Transaction IDs and Sequence Numbers

  1. It is possible for the coordinator to try to send that message to the worker and for the message to appear not be sent. When this happens, it is possible that the message does not actually reach the worker, or the message does reach the worker immediately, or the message reaches the worker sometime later. (Our tests create the situation deliberately. In a real network, it could likely happen as a result of some component (perhaps between the coordinator and worker machine) trying to resend a message after a failure.

Understanding Python OO

gRPC makes extensive use of python’s support for object orientation. For a more complete introduction to these Python features, see the official tutorial.

We provide a brief introduction here, which is surely less thorough and carefully checked:

Example class definition and usage

Here is a definition of a class Rectangle with:

.

      class Rectangle:
    def __init__(self, width=0, height=0):
        self.width = width
        self.height = height

    def set_width_and_height(self, new_width, new_height):
        self.width = new_width
        self.height = new_height

    def size(self):
        return self.width * self.height

    

An example of using this class would be like:

      my_rectangle = Rectangle(width=50)
print("The initial width is", my_rectangle.width)
my_rectangle.set_width_and_height(100, 200)
print("Now the width is", my_rectangle.width)
print("The size of the rectangle is", my_rectangle.size())

    

Which would print out:

      The initial width is 50
Now the width is 100
The size of the rectangle is 20000

    

Notice that:

Example of inheritence

Python supports inheritence. For example, to make PositionedRectangle subclass of Rectangle we can define that like:

      class PositionedRectangle(Rectangle):
    def __init__(self, width=0, height=0, x=0, y=0):
        self.x = x
        self.y = y
        super().__init__(width=width, height=height)

    def move_to(self, x, y):
        self.x = x
        self.y = y

    

An example of using this class might be like:

      my_rectangle = PositionedRectangle(x=4, y=100, width=10, height=10)
my_rectangle.set_width_and_height(50, 50)
print("the rectangle is at x=", my_rectangle.x, "y=", my_rectangle.y, " and has size", my_rectangle.size())

    

which would output:

      the rectangle is at x= 4 y= 100 and has size 2500