# How to create an RxJS Observable - manually and by use of Creation Operators

Reactive programming is somewhat of a shift in thinking on how you get values from a source of data. Instead of you asking the source every few seconds for an update, the source sends you data you are interested in over time. In other words, we are talking about a stream of data that comes to you over time.

In this first article (in my RxJS series), I want to focus on how to create such a data source.

## Terminologies
- **Observable**: A data source that outputs a stream of data over time.
- **Observer**: A consumer that *subscribes* to listen for data from the observable.

Most of the time you will be observing (as an Observer) on an existing Observable, however, there are cases where you will want to create an Observable yourself. For example, when you want to listen to some DOM events, or, you want to create some mock data for your Unit Test.

## Manual Observable creation
To create an Observable manually, we can use the Observable constructor:

```typescript
import { Observable } from 'rxjs';
let source$ = new Observable(subscriber => {
  subscriber.next("Hello");
  subscriber.next("World");
});
source$.subscribe(console.log);

// Output on console:
// Hello
// World
```
We can use the following two methods on the subscriber parameter:
1. error(): Emit an error, which stops emitting data hence forward.
2. complete(): Completes the source - nothing will be emitted hence forward.

```typescript
let source$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.error('oops');
  subscriber.next(2);  // will not be emitted
});

source$.subscribe({
  next: console.log,
  error: console.error,
  complete: () => console.log('Done')
});

// Output on console, oops in red:
// 1
// oops
```

## Creation Operators
In RxJS terminologies, operators that let you create an Observable are called Creation Operators.

### of Operator
The of operator takes in a parameter of values that it then streams out one after the other. 
```typescript
import { of } from 'rxjs';

let source$ = of(1,2,3,4);
source$.subscribe(console.log);

// Output on the console will be: 
// 1
// 2
// 3
// 4
```
We need to remember that the *of* operator spits out the values one at a time. It doesn't care if the first value is an array - it does not flatten the array (or do any other manipulation of it).

```typescript
import { of } from 'rxjs';
let source$ = of(1,2,3,[4,5,6]);
source$.subscribe(console.log);

// Output on the console will be:
// 1
// 2
// 3
// [4,5,6]
```

### from operator
This operator works almost similar to the*of* operator, except that it is a bit more intelligent. The from operator looks at the type passed and tries to emit it's content one by one. For example, if you pass it an array, instead of emitting the whole array (like the way 'of' does), from will emit each item in the array one at a time. Note that it does not break down the sub items - so if element 3 is a sub array, it won't break that array down too. If you pass the from operator a string, it will emit the contents of the string one by one.

```typescript
import { from } from 'rxjs';
let source$ = from('test');
source$.subscribe(console.log);

// Output on the console:
// t
// e
// s
// t
```
Compare the output above to the 'of' operator and you should see that the 'of' operator spits out 'test' as the only item it emits. Try *passing a promise* to the from operator and see if it plays nice.

```typescript
import { from } from 'rxjs';
let source$ = from(Promise.resolve('oops'));
source$.subscribe({ next: console.log, error: console.err });

// Output will show in red - telling us that console.err was called!
// oops
```

### interval operator
This operator emits values from 0 onwards in a specified interval spacing. For example, we have a sequence of values emitted every 5 seconds by passing the interval operator some interval time.

```typescript
import { interval } from 'rxjs';
let source$ = interval(5000);
source$.subscribe(console.log);

// Output on the console every 5 seconds, incremented:
// 0
// 1
// 2
// .... n
```

### timer operator
If you want to wait before emitting some value, use this operator. The timer operator takes in two parameters. The first one tells it how long to wait before emitting the first value. The second parameter tells it how much time to wait between subsequent emissions. Again, values start with 0 and keep on incrementing by 1.

```typescript
import { timer } from 'rxjs';
let source$ = timer(1000);
source$.subscribe(console.log);

// Output after 1 second:
// 0
```
Remember, if you specify only the first parameter, the observable will end after emitting the first value. You must specify a second parameter make the values continue indefinitely.

### fromEvent operator
This operator lets you subscribe to DOM events and every time that event fires, your observable emits respective event.

Here is an example of listening to a click event on the document:

```typescript
import { fromEvent } from 'rxjs';
let documentClick$ = fromEvent(document, 'click');
documentClick$.subscribe(console.log);
```
Try using the 'keypress' event and you can imagine how powerful this operator is. You can limit the event to a specific HTML element too. For example, imagine that you have a div tag with an id  of 'container':

```typescript
import { fromEvent } from 'rxjs';
let containerDiv = document.getElementById('container');
let containerClick$ = fromEvent(containerDiv, 'click');
containerClick$.subscribe(console.log);
```

## Important Points
We talked about how Observables are a stream of data source, but there are a few things I must point out:
1. Observables don't emit any data until they are subscribed to. 
2. Subscribing to an Observable is through the 'subscribe' method. However, this doesn't specify when the subscriber can stop. Remember, if there is even a single subscriber listening, the Observable will keep on emitting data (hence a potential memory issue).

To stop an observable from emitting data, one of the three things below must happen:

1. The source errors (subscriber.error()).
2. The source completes (subscriber.complete, or, timer(1000) where the operator completes.
3. The subscriber unsubscribes. Remember, if there are no subscribers, the source will stop.

Every subscribe call returns us a subscription. All we have to do is call 'unsubscribe' on this returned object.

```typescript
import { interval } from 'rxjs';
let source$ = interval(1000);
let subscription = source$.subscribe(console.log);
setTimeout(() => subscription.unsubscribe(), 1000);

// Output
// 0
```
The reason we use setTimeout is because we want to wait for a second so that the source has a chance to emit some value for us to see. 

4. The subscribe method takes in an object with the following three keys:
- next: The callback to fire when the source emits a data value successfully
- error: The callback to fire when an error is fired by the source
- complete: The callback to fire when the Observable completes.

```typescript
let source$ =  // some source observable
let subscription = source$.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.log(err),
  complete: () => console.log('Completed');
});
// In the future, unsubscribe
setTimeout(() => subscription.unsubscribe(), 5000);
```
I purposely kept my examples verbose. You can easily save some lines of code as shown below:

```typescript
import { of } from 'rxjs';
of([1,2,3]).subscribe(console.log).unsubscribe();
```

In an upcoming article in this series, I will go over some important operators that combine multiple Observables to do some very operations!


