0

I have a code like:

DataStream<Sample> stream = ...

stream.map(x => {
    x.setVal(10);
    return x;
});

// Sample class looks like this:
class Sample {
    int val = 5;
    public void setVal(int val) {
        this.val = val;
    }
}

When I check this stream by printing, I noticed that x.setVal() is not working. x.val remains 5, it doesn't become 10. I tried also doing x.val = 10, that didn't work too.

1 Answer 1

2

It works for the below code, Did you do env.execute?

Note that if you don’t call execute(), your application won’t be run.

https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/datastream_api/#a-complete-example

static class Sample {
    int val = 5;

    public void setVal(int val) {
        this.val = val;
    }
    public String toString(){
        return "My val is " + val;
    }
}

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    DataStream<Sample> mySamples = env.fromElements(new Sample());
    mySamples.map(sample -> {
                 sample.setVal(10);
                 return sample;
             })
             .print();

    env.execute();
}

Log:

09:59:46.838 [Map -> Sink: Print to Std. Out (10/16)#0] DEBUG org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Print to Std. Out (10/16)#0 (bf54e7d1b353295b15654e2448deccf3) [FINISHED]
09:59:46.838 [Map -> Sink: Print to Std. Out (9/16)#0] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate - Map -> Sink: Print to Std. Out (9/16)#0 (7ed1412933822e68476429e2dcb945b4): Releasing SingleInputGate{owningTaskName='Map -> Sink: Print to Std. Out (9/16)#0 (7ed1412933822e68476429e2dcb945b4)', gateIndex=0}.
09:59:46.838 [Map -> Sink: Print to Std. Out (11/16)#0] DEBUG org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Print to Std. Out (11/16)#0 (f051a157bfd41d38ad1d645543384003) [FINISHED]
09:59:46.838 [Map -> Sink: Print to Std. Out (9/16)#0] DEBUG org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Print to Std. Out (9/16)#0 (7ed1412933822e68476429e2dcb945b4) [FINISHED]

7> My val is 10

09:59:46.838 [Source: Collection Source (1/1)#0] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Collection Source (1/1)#0 (915eb1a21636f7a00f9b7b37d82225c3): Finished PipelinedSubpartition#14 [number of buffers: 4 (21 bytes), number of buffers in backlog: 0, finished? true, read view? true].
09:59:46.838 [Map -> Sink: Print to Std. Out (14/16)#0] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - PipelinedResultPartition 85e04b269c1186f0664dc9ee99034cf3#0@915eb1a21636f7a00f9b7b37d82225c3 [PIPELINED_BOUNDED, 16 subpartitions, 4 pending consumptions]: Received consumed notification for subpartition 13.
09:59:46.838 [Map -> Sink: Print to Std. Out (15/16)#0] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - PipelinedResultPartition 85e04b269c1186f0664dc9ee99034cf3#0@915eb1a21636f7a00f9b7b37d82225c3 [PIPELINED_BOUNDED, 16 subpartitions, 3 pending consumptions]: Received consumed notification for subpartition 14.
09:59:46.838 [Map -> Sink: Print to Std. Out (14/16)#0] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished task Map -> Sink: Print to Std. Out (14/16)#0
09:59:46.838 [Map -> Sink: Print to Std. Out (15/16)#0] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished task Map -> Sink: Print to Std. Out (15/16)#0
09:59:46.838 [Source: Collection Source (1/1)#0] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Collection Source (1/1)#0 (915eb1a21636f7a00f9b7b37d82225c3): Finished PipelinedSubpartition#15 [number of buffers: 4 (21 bytes), number of buffers in backlog: 0, finished? true, read view? true].

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.